Source code for skyborn.causality

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities for Liang and Granger causality analysis in atmospheric and climate data

This module provides causality analysis tools for time series data commonly
used in atmospheric and climate research.

Utilities for Liang and Granger causality analysis
https://github.com/LinkedEarth/Pyleoclim_util
"""

__all__ = ["liang_causality", "granger_causality"]
from typing import Any, Dict, List, Union

import numpy as np
from scipy.stats.mstats import mquantiles
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.arima_process import arma_generate_sample
from statsmodels.tsa.stattools import grangercausalitytests
from tqdm import tqdm


[docs] def ar1_fit_evenly(y: np.ndarray) -> float: """Returns the lag-1 autocorrelation from AR(1) fit. Uses `statsmodels.tsa.arima.model.ARIMA <https://www.statsmodels.org/devel/generated/statsmodels.tsa.arima.model.ARIMA.html>`_. to calculate lag-1 autocorrelation MARK FOR DEPRECATION once uar1_fit is adopted Parameters ---------- y : array Vector of (float) numbers as a time series Returns ------- g : float Lag-1 autocorrelation coefficient """ # syntax compatible with statsmodels v0.11.1 # ar1_mod = sm.tsa.ARMA(y, (1, 0), missing='drop').fit(trend='nc', disp=0) # syntax compatible with statsmodels v0.12 ar1_mod = ARIMA(y, order=(1, 0, 0), missing="drop", trend="ct").fit() g = ar1_mod.params[2] if g > 1: print( "Warning: AR(1) fitted autocorrelation greater than 1; setting to 1-eps^{1/4}" ) eps = np.spacing(1.0) g = 1.0 - eps ** (1 / 4) return g
def sm_ar1_sim(n: int, p: int, g: float, sig: float) -> np.ndarray: """Produce p realizations of an AR1 process of length n with lag-1 autocorrelation g using statsmodels Parameters ---------- n : int row dimensions p : int column dimensions g : float lag-1 autocorrelation coefficient sig : float the standard deviation of the original time series Returns ------- red : numpy matrix n rows by p columns matrix of an AR1 process See also -------- skyborn.causality.granger_causality : Granger causality analysis skyborn.causality.liang_causality : Liang information flow analysis """ # specify model parameters (statsmodel wants lag0 coefficents as unity) ar = np.r_[1, -g] # AR model parameter ma = np.r_[1, 0.0] # MA model parameters # theoretical noise variance for red to achieve the same variance as X sig_n = sig * np.sqrt(1 - g**2) red = np.empty(shape=(n, p)) # declare array # simulate AR(1) model for each column for i in np.arange(p): red[:, i] = arma_generate_sample( ar=ar, ma=ma, nsample=n, burnin=50, scale=sig_n ) return red # ------- # Main functions # --------
[docs] def granger_causality( y1: np.ndarray, y2: np.ndarray, maxlag: Union[int, List[int]] = 1, addconst: bool = True, verbose: bool = True, ) -> Dict[Any, Any]: """Granger causality tests Four tests for the Granger non-causality of 2 time series. All four tests give similar results. params_ftest and ssr_ftest are equivalent based on F test which is identical to lmtest:grangertest in R. Wrapper for the functions described in statsmodels (https://www.statsmodels.org/stable/generated/statsmodels.tsa.stattools.grangercausalitytests.html) Parameters ---------- y1, y2: array vectors of (real) numbers with identical length, no NaNs allowed maxlag : int or int iterable, optional If an integer, computes the test for all lags up to maxlag. If an iterable, computes the tests only for the lags in maxlag. addconst : bool, optional Include a constant in the model. verbose : bool, optional Print results Returns ------- dict All test results, dictionary keys are the number of lags. For each lag the values are a tuple, with the first element a dictionary with test statistic, pvalues, degrees of freedom, the second element are the OLS estimation results for the restricted model, the unrestricted model and the restriction (contrast) matrix for the parameter f_test. Notes ----- The null hypothesis for Granger causality tests is that y2, does NOT Granger cause y1. Granger causality means that past values of y2 have a statistically significant effect on the current value of y1, taking past values of y1 into account as regressors. We reject the null hypothesis that y2 does not Granger cause y1 if the p-values are below a desired threshold (e.g. 0.05). The null hypothesis for all four test is that the coefficients corresponding to past values of the second time series are zero. ‘params_ftest’, ‘ssr_ftest’ are based on the F distribution ‘ssr_chi2test’, ‘lrtest’ are based on the chi-square distribution See also -------- skyborn.causality.liang_causality : Information flow estimated using the Liang algorithm skyborn.causality.signif_isopersist : Significance test with AR(1) with same persistence skyborn.causality.signif_isospec : Significance test with surrogates with randomized phases References ---------- Granger, C. W. J. (1969). Investigating causal relations by econometric models and cross-spectral methods. Econometrica, 37(3), 424-438. Granger, C. W. J. (1980). Testing for causality: A personal viewpoont. Journal of Economic Dynamics and Control, 2, 329-352. Granger, C. W. J. (1988). Some recent development in a concept of causality. Journal of Econometrics, 39(1-2), 199-211. """ if len(y1) != len(y2): raise ValueError("Timeseries must be of same length") x = np.array([y1, y2]).T res = grangercausalitytests(x, maxlag=maxlag, addconst=addconst, verbose=verbose) return res
[docs] def phaseran(recblk: np.ndarray, nsurr: int) -> np.ndarray: """Simultaneous phase randomization of a set of time series It creates blocks of surrogate data with the same second order properties as the original time series dataset by transforming the original data into the frequency domain, randomizing the phases simultaneoulsy across the time series and converting the data back into the time domain. Written by Carlos Gias for MATLAB http://www.mathworks.nl/matlabcentral/fileexchange/32621-phase-randomization/content/phaseran.m Parameters ---------- recblk : numpy array 2D array , Row: time sample. Column: recording. An odd number of time samples (height) is expected. If that is not the case, recblock is reduced by 1 sample before the surrogate data is created. The class must be double and it must be nonsparse. nsurr : int is the number of image block surrogates that you want to generate. Returns ------- surrblk : numpy array 3D multidimensional array image block with the surrogate datasey along the third dimension See also -------- skyborn.causality.liang_causality : Liang-Kleeman information flow analysis skyborn.causality.granger_causality : Granger causality analysis References ---------- - Prichard, D., Theiler, J. Generating Surrogate Data for Time Series with Several Simultaneously Measured Variables (1994) Physical Review Letters, Vol 73, Number 7 - Carlos Gias (2020). Phase randomization, MATLAB Central File Exchange """ # Get parameters nfrms = recblk.shape[0] if nfrms % 2 == 0: nfrms = nfrms - 1 recblk = recblk[0:nfrms] len_ser = int((nfrms - 1) / 2) interv1 = np.arange(1, len_ser + 1) interv2 = np.arange(len_ser + 1, nfrms) # Fourier transform of the original dataset fft_recblk = np.fft.fft(recblk) surrblk = np.zeros((nfrms, nsurr)) # for k in tqdm(np.arange(nsurr)): for k in np.arange(nsurr): ph_rnd = np.random.rand(len_ser) # Create the random phases for all the time series ph_interv1 = np.exp(2 * np.pi * 1j * ph_rnd) ph_interv2 = np.conj(np.flipud(ph_interv1)) # Randomize all the time series simultaneously fft_recblk_surr = np.copy(fft_recblk) fft_recblk_surr[interv1] = fft_recblk[interv1] * ph_interv1 fft_recblk_surr[interv2] = fft_recblk[interv2] * ph_interv2 # Inverse transform surrblk[:, k] = np.real(np.fft.ifft(fft_recblk_surr)) return surrblk
[docs] def liang_causality( y1: np.ndarray, y2: np.ndarray, npt: int = 1, signif_test: str = "isospec", nsim: int = 1000, qs: List[float] = [0.005, 0.025, 0.05, 0.95, 0.975, 0.995], ) -> Dict[str, Any]: """Liang-Kleeman information flow Estimate the Liang information transfer from series y2 to series y1 with significance estimates using either an AR(1) tests with series with the same persistence or surrogates with randomized phases. Parameters ---------- y1, y2 : array vectors of (real) numbers with identical length, no NaNs allowed npt : int >=1 time advance in performing Euler forward differencing, e.g., 1, 2. Unless the series are generated with a highly chaotic deterministic system, npt=1 should be used signif_test : str; {'isopersist', 'isospec'} the method for significance test see signif_isospec and signif_isopersist for details. nsim : int the number of AR(1) surrogates for significance test qs : list the quantiles for significance test Returns ------- res : dict A dictionary of results including: - T21 : float - information flow from y2 to y1 (Note: not y1 -> y2!) - tau21 : float - the standardized information flow from y2 to y1 - Z : float - the total information flow from y2 to y1 - dH1_star : float - dH*/dt (Liang, 2016) - dH1_noise : float - signif_qs : the quantiles for significance test - T21_noise : list - the quantiles of the information flow from noise2 to noise1 for significance testing - tau21_noise : list - the quantiles of the standardized information flow from noise2 to noise1 for significance testing See also -------- skyborn.causality.liang : Information flow estimated using the Liang algorithm skyborn.causality.granger_causality : Information flow estimated using the Granger algorithm skyborn.causality.signif_isopersist : Significance test with AR(1) with same persistence skyborn.causality.signif_isospec : Significance test with surrogates with randomized phases References ---------- Liang, X.S. (2013) The Liang-Kleeman Information Flow: Theory and Applications. Entropy, 15, 327-360, doi:10.3390/e15010327 Liang, X.S. (2014) Unraveling the cause-effect relation between timeseries. Physical review, E 90, 052150 Liang, X.S. (2015) Normalizing the causality between time series. Physical review, E 92, 022126 Liang, X.S. (2016) Information flow and causality as rigorous notions ab initio. Physical review, E 94, 052201 """ dt = 1 nm = np.size(y1) grad1 = (y1[0 + npt :] - y1[0:-npt]) / (npt) grad2 = (y2[0 + npt :] - y2[0:-npt]) / (npt) y1 = y1[:-npt] y2 = y2[:-npt] N = nm - npt C = np.cov(y1, y2) detC = np.linalg.det(C) dC = np.ndarray((2, 2)) dC[0, 0] = np.sum((y1 - np.mean(y1)) * (grad1 - np.mean(grad1))) dC[0, 1] = np.sum((y1 - np.mean(y1)) * (grad2 - np.mean(grad2))) dC[1, 0] = np.sum((y2 - np.mean(y2)) * (grad1 - np.mean(grad1))) dC[1, 1] = np.sum((y2 - np.mean(y2)) * (grad2 - np.mean(grad2))) dC /= N - 1 a11 = C[1, 1] * dC[0, 0] - C[0, 1] * dC[1, 0] a12 = -C[0, 1] * dC[0, 0] + C[0, 0] * dC[1, 0] a11 /= detC a12 /= detC f1 = np.mean(grad1) - a11 * np.mean(y1) - a12 * np.mean(y2) R1 = grad1 - (f1 + a11 * y1 + a12 * y2) Q1 = np.sum(R1 * R1) b1 = np.sqrt(Q1 * dt / N) NI = np.ndarray((4, 4)) NI[0, 0] = N * dt / b1**2 NI[1, 1] = dt / b1**2 * np.sum(y1 * y1) NI[2, 2] = dt / b1**2 * np.sum(y2 * y2) NI[3, 3] = 3 * dt / b1**4 * np.sum(R1 * R1) - N / b1**2 NI[0, 1] = dt / b1**2 * np.sum(y1) NI[0, 2] = dt / b1**2 * np.sum(y2) NI[0, 3] = 2 * dt / b1**3 * np.sum(R1) NI[1, 2] = dt / b1**2 * np.sum(y1 * y2) NI[1, 3] = 2 * dt / b1**3 * np.sum(R1 * y1) NI[2, 3] = 2 * dt / b1**3 * np.sum(R1 * y2) NI[1, 0] = NI[0, 1] NI[2, 0] = NI[0, 2] NI[2, 1] = NI[1, 2] NI[3, 0] = NI[0, 3] NI[3, 1] = NI[1, 3] NI[3, 2] = NI[2, 3] invNI = np.linalg.pinv(NI) var_a12 = invNI[2, 2] T21 = C[0, 1] / C[0, 0] * (-C[1, 0] * dC[0, 0] + C[0, 0] * dC[1, 0]) / detC var_T21 = (C[0, 1] / C[0, 0]) ** 2 * var_a12 dH1_star = a11 dH1_noise = b1**2 / (2 * C[0, 0]) Z = np.abs(T21) + np.abs(dH1_star) + np.abs(dH1_noise) tau21 = T21 / Z dH1_star = dH1_star / Z dH1_noise = dH1_noise / Z signif_test_func = { "isopersist": signif_isopersist, "isospec": signif_isospec, } signif_dict = signif_test_func[signif_test]( y1, y2, method="liang", nsim=nsim, qs=qs, npt=npt ) T21_noise_qs = signif_dict["T21_noise_qs"] tau21_noise_qs = signif_dict["tau21_noise_qs"] res = { "T21": T21, "tau21": tau21, "Z": Z, "dH1_star": dH1_star, "dH1_noise": dH1_noise, "signif_qs": qs, "T21_noise": T21_noise_qs, "tau21_noise": tau21_noise_qs, } return res
def liang(y1: np.ndarray, y2: np.ndarray, npt: int = 1) -> Dict[str, float]: """ Estimate the Liang information transfer from series y2 to series y1 Parameters ---------- y1, y2 : array Vectors of (real) numbers with identical length, no NaNs allowed npt : int >=1 Time advance in performing Euler forward differencing, e.g., 1, 2. Unless the series are generated with a highly chaotic deterministic system, npt=1 should be used Returns ------- res : dict A dictionary of results including: - T21 (float): information flow from y2 to y1 (Note: not y1 -> y2!) - tau21 (float): the standardized information flow from y2 to y1 - Z (float): the total information flow from y2 to y1 - dH1_star (float): dH*/dt (Liang, 2016) - dH1_noise (float) See also -------- skyborn.causality.liang_causality : Information flow estimated using the Liang algorithm skyborn.causality.granger_causality : Information flow estimated using the Granger algorithm skyborn.causality.signif_isopersist : Significance test with AR(1) with same persistence skyborn.causality.signif_isospec : Significance test with surrogates with randomized phases References ---------- Liang, X.S. (2013) The Liang-Kleeman Information Flow: Theory and Applications. Entropy, 15, 327-360, doi:10.3390/e15010327 Liang, X.S. (2014) Unraveling the cause-effect relation between timeseries. Physical review, E 90, 052150 Liang, X.S. (2015) Normalizing the causality between time series. Physical review, E 92, 022126 Liang, X.S. (2016) Information flow and causality as rigorous notions ab initio. Physical review, E 94, 052201 """ dt = 1 nm = np.size(y1) grad1 = (y1[0 + npt :] - y1[0:-npt]) / (npt) grad2 = (y2[0 + npt :] - y2[0:-npt]) / (npt) y1 = y1[:-npt] y2 = y2[:-npt] N = nm - npt C = np.cov(y1, y2) detC = np.linalg.det(C) dC = np.ndarray((2, 2)) dC[0, 0] = np.sum((y1 - np.mean(y1)) * (grad1 - np.mean(grad1))) dC[0, 1] = np.sum((y1 - np.mean(y1)) * (grad2 - np.mean(grad2))) dC[1, 0] = np.sum((y2 - np.mean(y2)) * (grad1 - np.mean(grad1))) dC[1, 1] = np.sum((y2 - np.mean(y2)) * (grad2 - np.mean(grad2))) dC /= N - 1 a11 = C[1, 1] * dC[0, 0] - C[0, 1] * dC[1, 0] a12 = -C[0, 1] * dC[0, 0] + C[0, 0] * dC[1, 0] a11 /= detC a12 /= detC f1 = np.mean(grad1) - a11 * np.mean(y1) - a12 * np.mean(y2) R1 = grad1 - (f1 + a11 * y1 + a12 * y2) Q1 = np.sum(R1 * R1) b1 = np.sqrt(Q1 * dt / N) NI = np.ndarray((4, 4)) NI[0, 0] = N * dt / b1**2 NI[1, 1] = dt / b1**2 * np.sum(y1 * y1) NI[2, 2] = dt / b1**2 * np.sum(y2 * y2) NI[3, 3] = 3 * dt / b1**4 * np.sum(R1 * R1) - N / b1**2 NI[0, 1] = dt / b1**2 * np.sum(y1) NI[0, 2] = dt / b1**2 * np.sum(y2) NI[0, 3] = 2 * dt / b1**3 * np.sum(R1) NI[1, 2] = dt / b1**2 * np.sum(y1 * y2) NI[1, 3] = 2 * dt / b1**3 * np.sum(R1 * y1) NI[2, 3] = 2 * dt / b1**3 * np.sum(R1 * y2) NI[1, 0] = NI[0, 1] NI[2, 0] = NI[0, 2] NI[2, 1] = NI[1, 2] NI[3, 0] = NI[0, 3] NI[3, 1] = NI[1, 3] NI[3, 2] = NI[2, 3] invNI = np.linalg.pinv(NI) var_a12 = invNI[2, 2] T21 = C[0, 1] / C[0, 0] * (-C[1, 0] * dC[0, 0] + C[0, 0] * dC[1, 0]) / detC var_T21 = (C[0, 1] / C[0, 0]) ** 2 * var_a12 dH1_star = a11 dH1_noise = b1**2 / (2 * C[0, 0]) Z = np.abs(T21) + np.abs(dH1_star) + np.abs(dH1_noise) tau21 = T21 / Z dH1_star = dH1_star / Z dH1_noise = dH1_noise / Z res = { "T21": T21, "tau21": tau21, "Z": Z, "dH1_star": dH1_star, "dH1_noise": dH1_noise, } return res
[docs] def signif_isopersist( y1: np.ndarray, y2: np.ndarray, method: str, nsim: int = 1000, qs: list[float] = [0.005, 0.025, 0.05, 0.95, 0.975, 0.995], **kwargs, ) -> dict[str, np.ndarray]: """significance test with AR(1) with same persistence Parameters ---------- y1, y2 : array vectors of (real) numbers with identical length, no NaNs allowed method : str; {'liang'} estimates for the Liang method nsim : int the number of AR(1) surrogates for significance test qs : list the quantiles for significance test Returns ------- res_dict : dict A dictionary with the following information: - T21_noise_qs : list the quantiles of the information flow from noise2 to noise1 for significance testing - tau21_noise_qs : list the quantiles of the standardized information flow from noise2 to noise1 for significance testing See also -------- skyborn.causality.liang_causality : Information flow estimated using the Liang algorithm skyborn.causality.granger_causality : Information flow estimated using the Granger algorithm skyborn.causality.signif_isospec : Significance test with surrogates with randomized phases """ g1 = ar1_fit_evenly(y1) g2 = ar1_fit_evenly(y2) sig1 = np.std(y1) sig2 = np.std(y2) n = np.size(y1) noise1 = sm_ar1_sim(n, nsim, g1, sig1) noise2 = sm_ar1_sim(n, nsim, g2, sig2) if method == "liang": npt = kwargs["npt"] if "npt" in kwargs else 1 T21_noise = [] tau21_noise = [] for i in tqdm(range(nsim), desc="Calculating causality between surrogates"): res_noise = liang(noise1[:, i], noise2[:, i], npt=npt) tau21_noise.append(res_noise["tau21"]) T21_noise.append(res_noise["T21"]) tau21_noise = np.array(tau21_noise) T21_noise = np.array(T21_noise) tau21_noise_qs = mquantiles(tau21_noise, qs) T21_noise_qs = mquantiles(T21_noise, qs) res_dict = { "tau21_noise_qs": tau21_noise_qs, "T21_noise_qs": T21_noise_qs, } # TODO add granger method else: raise KeyError(f"{method} is not a valid method") return res_dict
[docs] def signif_isospec( y1: np.ndarray, y2: np.ndarray, method: str, nsim: int = 1000, qs: list[float] = [0.005, 0.025, 0.05, 0.95, 0.975, 0.995], **kwargs, ) -> dict[str, np.ndarray]: """significance test with surrogates with randomized phases Parameters ---------- y1, y2 : array vectors of (real) numbers with identical length, no NaNs allowed method : str; {'liang'} estimates for the Liang method nsim : int the number of surrogates for significance test qs : list the quantiles for significance test kwargs : dict keyword arguments for the causality method (e.g. npt for Liang-Kleeman) Returns ------- res_dict : dict A dictionary with the following information: - T21_noise_qs : list the quantiles of the information flow from noise2 to noise1 for significance testing - tau21_noise_qs : list the quantiles of the standardized information flow from noise2 to noise1 for significance testing See also -------- skyborn.causality.liang_causality : Information flow estimated using the Liang algorithm skyborn.causality.granger_causality : Information flow estimated using the Granger algorithm skyborn.causality.signif_isopersist : Significance test with AR(1) with same persistence """ noise1 = phaseran(y1, nsim) noise2 = phaseran(y2, nsim) if method == "liang": npt = kwargs["npt"] if "npt" in kwargs else 1 T21_noise = [] tau21_noise = [] for i in tqdm(range(nsim), desc="Calculating causality between surrogates"): res_noise = liang(noise1[:, i], noise2[:, i], npt=npt) tau21_noise.append(res_noise["tau21"]) T21_noise.append(res_noise["T21"]) tau21_noise = np.array(tau21_noise) T21_noise = np.array(T21_noise) tau21_noise_qs = mquantiles(tau21_noise, qs) T21_noise_qs = mquantiles(T21_noise, qs) res_dict = { "tau21_noise_qs": tau21_noise_qs, "T21_noise_qs": T21_noise_qs, } # TODO Recode with Surrogate class else: raise KeyError(f"{method} is not a valid method") return res_dict