"""
Mann-Kendall trend analysis for Skyborn package.
This module provides optimized implementations of the Mann-Kendall test
for trend detection in time series data, supporting both xarray and numpy arrays.
Examples
--------
Basic usage with 1D time series:
>>> import numpy as np
>>> from skyborn.calc import mann_kendall_test
>>>
>>> # Create time series with trend
>>> x = np.arange(100)
>>> y = 0.5 * x + np.random.randn(100)
>>>
>>> # Perform Mann-Kendall test
>>> result = mann_kendall_test(y)
>>> print(f"Trend: {result['trend']:.3f}")
>>> print(f"Significant: {result['h']}")
Multidimensional trend analysis:
>>> import numpy as np
>>> from skyborn.calc import trend_analysis
>>>
>>> # Create 3D data (time, lat, lon)
>>> data = np.random.randn(100, 50, 80) + np.linspace(0, 2, 100)[:, None, None]
>>>
>>> # Analyze trends along time axis (axis 0)
>>> results = trend_analysis(data, axis=0)
>>> print(f"Trend shape: {results['trend'].shape}") # (50, 80)
Flexible dimension specification with xarray:
>>> import xarray as xr
>>> from skyborn.calc import trend_analysis
>>>
>>> # Create sample data
>>> time = pd.date_range('2000', periods=120, freq='M')
>>> data = xr.DataArray(
... np.random.randn(120, 10, 15) + np.linspace(0, 3, 120)[:, None, None],
... coords={'time': time, 'lat': range(10), 'lon': range(15)},
... dims=['time', 'lat', 'lon']
... )
>>>
>>> # Analyze trends along any dimension
>>> results = trend_analysis(data, dim='time')
>>> print(results.trend.values.shape) # (10, 15)
"""
import sys
import warnings
from importlib import import_module
from importlib import util as importlib_util
from importlib.machinery import EXTENSION_SUFFIXES
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union
import dask.array as da
import numpy as np
import scipy.stats as stats
import xarray as xr
from dask.diagnostics import ProgressBar
from ._compiled import (
_as_core_input_2d,
_load_core_module_from,
_require_kernel,
_resolve_kernel_handles,
)
from .regression import (
_linregress_slope_batch,
_linregress_std_error_batch,
_theil_std_error_batch,
)
from .variants import (
_autocorr_lag_limit,
_correlated_multivariate_stats_scalar,
_correlated_seasonal_stats_scalar,
_grouped_time_index,
_hamed_rao_variance_batch,
_multivariate_score_variance_batch,
_multivariate_score_variance_scalar,
_multivariate_sens_slope_scalar,
_pre_whiten_score_variance_batch,
_seasonal_score_variance_batch,
_seasonal_score_variance_scalar,
_seasonal_sens_slope_scalar,
_trend_free_pre_whiten_score_variance_batch,
_yue_wang_variance_batch,
)
__all__ = [
"mann_kendall_test",
"mann_kendall_multidim",
"mann_kendall_xarray",
"trend_analysis",
"mk_test",
"mk_multidim",
]
_GROUPED_TESTS = {"multivariate", "regional", "correlated_multivariate"}
def _normalize_test_name(test: str) -> str:
"""Normalize public MK test aliases to one canonical label."""
normalized = test.lower().replace("-", "_").replace(" ", "_")
if normalized in {"original", "mk", "default"}:
return "original"
if normalized in {"yue_wang", "yuewang"}:
return "yue_wang"
if normalized in {"seasonal", "seasonal_mk"}:
return "seasonal"
if normalized in {"correlated_seasonal", "correlatedseasonal"}:
return "correlated_seasonal"
if normalized in {"correlated_multivariate", "correlatedmultivariate"}:
return "correlated_multivariate"
if normalized in {"multivariate", "multivar"}:
return "multivariate"
if normalized in {"regional", "regional_mk"}:
return "regional"
if normalized in {"hamed_rao", "hamedrao"}:
return "hamed_rao"
if normalized in {
"trend_free_pre_whitening",
"trend_free_prewhitening",
"tfpw",
}:
return "trend_free_pre_whitening"
if normalized in {"pre_whitening", "prewhitening", "pre_whiten"}:
return "pre_whitening"
raise ValueError(
"Unknown test: "
f"{test}. Supported tests are 'original', 'yue_wang', 'seasonal', "
"'correlated_seasonal', 'correlated_multivariate', "
"'multivariate', 'regional', "
"'hamed_rao', 'pre_whitening', and "
"'trend_free_pre_whitening'."
)
def _resolve_seasonal_period(test_name: str, period: Optional[int]) -> Optional[int]:
"""Return the effective seasonal cycle length for the active test family."""
if test_name in {"seasonal", "correlated_seasonal"}:
return 12 if period is None else int(period)
return None
def _load_core_module():
"""Best-effort loader for the compiled Mann-Kendall core extension."""
return _load_core_module_from(
file_path=__file__,
package_name=__package__,
extension_suffixes=EXTENSION_SUFFIXES,
path_type=Path,
importlib_util_module=importlib_util,
import_module_func=import_module,
sys_modules=sys.modules,
)
_core_module = _load_core_module()
_kernel_handles = _resolve_kernel_handles(_core_module)
_score_variance_kernel = _kernel_handles["_score_variance_kernel"]
_score_variance_slope_kernel = _kernel_handles["_score_variance_slope_kernel"]
_yue_wang_slope_kernel = _kernel_handles["_yue_wang_slope_kernel"]
_hamed_rao_slope_kernel = _kernel_handles["_hamed_rao_slope_kernel"]
_sen_slope_kernel = _kernel_handles["_sen_slope_kernel"]
_grouped_sen_slope_kernel = _kernel_handles["_grouped_sen_slope_kernel"]
_grouped_correlated_stats_kernel = _kernel_handles["_grouped_correlated_stats_kernel"]
def _kernels_ready() -> bool:
"""Return whether the compiled Mann-Kendall kernels are available."""
return (
_score_variance_kernel is not None
and _score_variance_slope_kernel is not None
and _sen_slope_kernel is not None
and _grouped_sen_slope_kernel is not None
and _grouped_correlated_stats_kernel is not None
)
def _score_variance_batch(
data_2d: np.ndarray, modified: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
"""Run the compiled 2D score/variance kernel."""
kernel = _require_kernel(_score_variance_kernel, "mk_score_var_batch")
s_values, var_values = kernel(_as_core_input_2d(data_2d), int(modified))
return np.asarray(s_values, dtype=np.float64), np.asarray(
var_values, dtype=np.float64
)
def _sen_slope_batch(data_2d: np.ndarray) -> np.ndarray:
"""Run the compiled 2D Theil-Sen slope kernel."""
kernel = _require_kernel(_sen_slope_kernel, "sen_slope_batch")
slopes = kernel(_as_core_input_2d(data_2d))
return np.asarray(slopes, dtype=np.float64)
def _grouped_sen_slope_batch(data_2d: np.ndarray, period: int) -> np.ndarray:
"""Run the compiled grouped 2D Theil-Sen slope kernel."""
kernel = _require_kernel(_grouped_sen_slope_kernel, "grouped_sen_slope_batch")
slopes = kernel(_as_core_input_2d(data_2d), int(period))
return np.asarray(slopes, dtype=np.float64)
def _grouped_correlated_stats_batch(
data_2d: np.ndarray, period: int
) -> Tuple[np.ndarray, np.ndarray, float]:
"""Run the compiled grouped correlated-statistics kernel."""
kernel = _require_kernel(
_grouped_correlated_stats_kernel, "grouped_correlated_stats_batch"
)
s_values, var_values, denom = kernel(_as_core_input_2d(data_2d), int(period))
return (
np.asarray(s_values, dtype=np.float64),
np.asarray(var_values, dtype=np.float64),
float(denom),
)
def _score_variance_slope_batch(
data_2d: np.ndarray, modified: bool = False
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Run the compiled 2D combined score/variance/slope kernel."""
kernel = _require_kernel(_score_variance_slope_kernel, "mk_score_var_sen_batch")
s_values, var_values, slopes = kernel(_as_core_input_2d(data_2d), int(modified))
return (
np.asarray(s_values, dtype=np.float64),
np.asarray(var_values, dtype=np.float64),
np.asarray(slopes, dtype=np.float64),
)
def _yue_wang_score_variance_slope_batch(
data_2d: np.ndarray, lag: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Run the compiled lag-aware Yue-Wang score/variance/slope kernel."""
kernel = _require_kernel(_yue_wang_slope_kernel, "mk_yue_wang_score_var_sen_batch")
s_values, var_values, slopes = kernel(_as_core_input_2d(data_2d), int(lag))
return (
np.asarray(s_values, dtype=np.float64),
np.asarray(var_values, dtype=np.float64),
np.asarray(slopes, dtype=np.float64),
)
def _hamed_rao_score_variance_slope_batch(
data_2d: np.ndarray, alpha: float, lag: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Run the compiled Hamed-Rao score/variance/slope kernel."""
kernel = _require_kernel(
_hamed_rao_slope_kernel, "mk_hamed_rao_score_var_sen_batch"
)
interval = float(stats.norm.ppf(1 - alpha / 2) / np.sqrt(data_2d.shape[0]))
s_values, var_values, slopes = kernel(
_as_core_input_2d(data_2d), interval, int(lag)
)
return (
np.asarray(s_values, dtype=np.float64),
np.asarray(var_values, dtype=np.float64),
np.asarray(slopes, dtype=np.float64),
)
[docs]
def mann_kendall_test(
data: Union[np.ndarray, "xr.DataArray"],
alpha: float = 0.05,
method: str = "theilslopes",
test: str = "original",
period: Optional[int] = None,
lag: Optional[int] = None,
) -> Dict[str, Union[float, bool]]:
"""
Perform Mann-Kendall test for trend detection on 1D time series.
The Mann-Kendall test is a nonparametric test for detecting monotonic trends
in time series data. It makes no assumptions about the distribution of the data.
Parameters
----------
data : array-like
1D time series data. Can be numpy array or xarray DataArray.
alpha : float, default 0.05
Significance level for hypothesis testing (Type I error probability).
method : str, default 'theilslopes'
Method for calculating slope:
- 'theilslopes': Theil-Sen slope estimator (robust, recommended)
- 'linregress': Linear regression slope (faster but less robust)
test : str, default 'original'
Mann-Kendall test family to apply:
- 'original': original Mann-Kendall test
- 'yue_wang': Yue and Wang (2004) modified variance correction
- 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test
- 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test
- 'correlated_multivariate': Libiseller-Grimvall correlated multivariate MK test
- 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test
- 'regional': Helsel regional MK test
- 'hamed_rao': Hamed and Rao (1998) variance correction
- 'pre_whitening': Yue and Wang (2002) pre-whitening modification
- 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening
period : int, optional
Seasonal cycle length used by ``test="seasonal"`` and
``test="correlated_seasonal"``. When omitted, these seasonal test
families default to ``12``. Other test families ignore this argument.
lag : int, optional
Number of autocorrelation lags to include for ``test="yue_wang"``
and ``test="hamed_rao"``. ``None`` uses all available lags. This
argument has no effect for other test families.
Returns
-------
result : dict
Dictionary containing trend analysis results:
- 'trend' : float
Slope of the trend (units per time step)
- 'h' : bool
Hypothesis test result. True if significant trend exists at
the specified alpha level.
- 'p' : float
Two-tailed p-value of the test
- 'z' : float
Normalized test statistic (z-score)
- 'tau' : float
Kendall's tau correlation coefficient
- 'std_error' : float
Standard error of the detrended residuals
Notes
-----
The Mann-Kendall test statistic S is calculated as:
.. math::
S = \\sum_{i=1}^{n-1} \\sum_{j=i+1}^{n} \\text{sign}(x_j - x_i)
The test assumes that:
- Data are independent, unless a serial-correlation-aware `test=` family is used
- Data come from the same distribution
- Missing values are handled by removal
References
----------
Mann, H. B. (1945). Nonparametric tests against trend. Econometrica, 13(3), 245-259.
Kendall, M. G. (1948). Rank correlation methods. Griffin, London.
Yue, S., & Wang, C. (2004). The Mann-Kendall test modified by effective sample size
to detect trend in serially correlated hydrological series. Water resources
management, 18(3), 201-218.
Examples
--------
>>> import numpy as np
>>> from skyborn.calc import mann_kendall_test
>>>
>>> # Generate trend data
>>> t = np.arange(50)
>>> data = 0.3 * t + np.random.normal(0, 0.5, 50)
>>>
>>> # Test for trend
>>> result = mann_kendall_test(data)
>>> print(f"Trend slope: {result['trend']:.3f}")
>>> print(f"P-value: {result['p']:.3f}")
>>> print(f"Significant trend: {result['h']}")
"""
test_name = _normalize_test_name(test)
effective_period = _resolve_seasonal_period(test_name, period)
if hasattr(data, "values"):
values = data.values
else:
values = np.asarray(data)
if test_name in _GROUPED_TESTS:
grouped_values = np.asarray(values, dtype=np.float64)
if grouped_values.ndim == 1:
grouped_values = grouped_values.reshape(-1, 1)
elif grouped_values.ndim != 2:
raise ValueError(
"Grouped Mann-Kendall tests require 1D or 2D data with shape "
"(time, group). Use trend_analysis for multidimensional data."
)
finite_mask = np.isfinite(grouped_values)
n_finite = int(np.sum(finite_mask))
if n_finite < 3:
warnings.warn("Need at least 3 data points for Mann-Kendall test")
return {
"trend": np.nan,
"h": False,
"p": np.nan,
"z": np.nan,
"tau": np.nan,
"std_error": np.nan,
}
if method not in {"theilslopes", "linregress"}:
raise ValueError(
f"Unknown method: {method}. Use 'theilslopes' or 'linregress'"
)
if method == "theilslopes":
slope, intercept = _multivariate_sens_slope_scalar(grouped_values)
else:
grouped_x = _grouped_time_index(
grouped_values.shape[0], grouped_values.shape[1]
)
flattened = grouped_values.reshape(-1)
finite_flat = np.isfinite(flattened)
slope, intercept = stats.linregress(
grouped_x[finite_flat], flattened[finite_flat]
)[:2]
if test_name == "correlated_multivariate":
S, var_s, denom = _correlated_multivariate_stats_scalar(grouped_values)
with np.errstate(divide="ignore", invalid="ignore"):
z = S / np.sqrt(var_s)
else:
S, var_s, denom = _multivariate_score_variance_scalar(grouped_values)
if S > 0:
z = (S - 1) / np.sqrt(var_s)
elif S == 0:
z = 0
else:
z = (S + 1) / np.sqrt(var_s)
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
h = abs(z) > stats.norm.ppf(1 - alpha / 2)
tau = S / denom if denom != 0.0 else np.nan
grouped_x = _grouped_time_index(
grouped_values.shape[0], grouped_values.shape[1]
)
flattened = grouped_values.reshape(-1)
finite_flat = np.isfinite(flattened)
fitted = grouped_x[finite_flat] * slope + intercept
std_error = np.std(flattened[finite_flat] - fitted) / np.sqrt(n_finite)
return {
"trend": slope,
"h": h,
"p": p_value,
"z": z,
"tau": tau,
"std_error": std_error,
}
if values.ndim > 1:
raise ValueError(
"mann_kendall_test only accepts 1D data. Use trend_analysis for multidimensional data."
)
if test_name in {"seasonal", "correlated_seasonal"}:
season_values = np.asarray(values, dtype=np.float64)
finite_mask = np.isfinite(season_values)
n_finite = int(np.sum(finite_mask))
if n_finite < 3:
warnings.warn("Need at least 3 data points for Mann-Kendall test")
return {
"trend": np.nan,
"h": False,
"p": np.nan,
"z": np.nan,
"tau": np.nan,
"std_error": np.nan,
}
if method not in {"theilslopes", "linregress"}:
raise ValueError(
f"Unknown method: {method}. Use 'theilslopes' or 'linregress'"
)
if method == "theilslopes":
slope, intercept = _seasonal_sens_slope_scalar(
season_values, effective_period
)
else:
x_finite = np.arange(season_values.size, dtype=np.float64)[finite_mask]
slope, intercept = stats.linregress(x_finite, season_values[finite_mask])[
:2
]
if test_name == "seasonal":
S, var_s, denom = _seasonal_score_variance_scalar(
season_values, effective_period
)
if S > 0:
z = (S - 1) / np.sqrt(var_s)
elif S == 0:
z = 0
else:
z = (S + 1) / np.sqrt(var_s)
else:
S, var_s, denom = _correlated_seasonal_stats_scalar(
season_values, effective_period
)
with np.errstate(divide="ignore", invalid="ignore"):
z = S / np.sqrt(var_s)
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
h = abs(z) > stats.norm.ppf(1 - alpha / 2)
tau = S / denom if denom != 0.0 else np.nan
finite_index = np.arange(season_values.size, dtype=np.float64)[finite_mask]
fitted = finite_index / effective_period * slope + intercept
std_error = np.std(season_values[finite_mask] - fitted) / np.sqrt(n_finite)
return {
"trend": slope,
"h": h,
"p": p_value,
"z": z,
"tau": tau,
"std_error": std_error,
}
valid_mask = np.isfinite(values)
y = np.asarray(values[valid_mask], dtype=np.float64)
x = np.arange(len(y), dtype=np.float64)
n = len(y)
if n < 3:
warnings.warn("Need at least 3 data points for Mann-Kendall test")
return {
"trend": np.nan,
"h": False,
"p": np.nan,
"z": np.nan,
"tau": np.nan,
"std_error": np.nan,
}
if method not in {"theilslopes", "linregress"}:
raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'")
if method == "theilslopes":
slope = float(_sen_slope_batch(y.reshape(-1, 1))[0])
intercept = np.median(y) - slope * np.median(x)
else:
slope, intercept = stats.linregress(x, y)[:2]
if test_name == "pre_whitening":
stat_n = n - 1
s_values, var_values = _pre_whiten_score_variance_batch(y.reshape(-1, 1))
S = int(np.rint(s_values[0]))
var_s = float(var_values[0])
elif test_name == "yue_wang":
lag_limit = _autocorr_lag_limit(n, lag)
if lag is None and method == "theilslopes":
S_values, var_values, _ = _score_variance_slope_batch(
y.reshape(-1, 1), modified=True
)
var_s = float(var_values[0])
elif lag is None:
S_values, var_values = _score_variance_batch(
y.reshape(-1, 1), modified=True
)
var_s = float(var_values[0])
else:
S_values, var_values, correction_slopes = (
_yue_wang_score_variance_slope_batch(y.reshape(-1, 1), lag_limit)
)
var_s = float(var_values[0])
if method == "theilslopes":
slope = float(correction_slopes[0])
intercept = np.median(y) - slope * np.median(x)
S = int(np.rint(S_values[0]))
stat_n = n
elif test_name == "trend_free_pre_whitening":
stat_n = n - 1
s_values, var_values = _trend_free_pre_whiten_score_variance_batch(
y.reshape(-1, 1), np.asarray([slope], dtype=np.float64)
)
S = int(np.rint(s_values[0]))
var_s = float(var_values[0])
elif test_name == "hamed_rao":
lag_limit = _autocorr_lag_limit(n, lag)
s_values, var_values, correction_slopes = _hamed_rao_score_variance_slope_batch(
y.reshape(-1, 1), alpha=alpha, lag=lag_limit
)
S = int(np.rint(s_values[0]))
var_s = float(var_values[0])
if method == "theilslopes":
slope = float(correction_slopes[0])
intercept = np.median(y) - slope * np.median(x)
stat_n = n
elif method == "theilslopes":
S_values, var_values, _ = _score_variance_slope_batch(
y.reshape(-1, 1), modified=False
)
S = int(np.rint(S_values[0]))
var_s = float(var_values[0])
stat_n = n
else:
S = _calculate_mk_score(y)
var_s = _calculate_mk_variance(y, n, modified=False)
stat_n = n
if S > 0:
z = (S - 1) / np.sqrt(var_s)
elif S == 0:
z = 0
else:
z = (S + 1) / np.sqrt(var_s)
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
h = abs(z) > stats.norm.ppf(1 - alpha / 2)
tau = S / (0.5 * stat_n * (stat_n - 1))
y_detrend = y - (x * slope + intercept)
std_error = np.std(y_detrend) / np.sqrt(n)
return {
"trend": slope,
"h": h,
"p": p_value,
"z": z,
"tau": tau,
"std_error": std_error,
}
def _calculate_mk_score(y: np.ndarray) -> int:
"""Calculate Mann-Kendall score S."""
s_values, _ = _score_variance_batch(np.asarray(y, dtype=np.float64).reshape(-1, 1))
return int(np.rint(s_values[0]))
def _calculate_mk_variance(y: np.ndarray, n: int, modified: bool = False) -> float:
"""Calculate variance of Mann-Kendall score."""
_, variances = _score_variance_batch(
np.asarray(y, dtype=np.float64).reshape(-1, 1), modified=modified
)
return float(variances[0])
def _resolve_group_axis(
data: Any,
group_axis: Optional[Union[int, str]],
time_axis: int,
test_name: str,
) -> Optional[int]:
"""Resolve the grouping axis for grouped MK families."""
if test_name not in _GROUPED_TESTS:
return None
if group_axis is not None:
if isinstance(group_axis, str):
if hasattr(data, "get_axis_num"):
resolved = data.get_axis_num(group_axis)
else:
raise ValueError(
"String group_axis requires an array-like object with named dimensions."
)
else:
resolved = int(group_axis)
else:
ndim = data.ndim if hasattr(data, "ndim") else np.asarray(data).ndim
candidate_axes = [axis for axis in range(ndim) if axis != time_axis]
if len(candidate_axes) == 1:
resolved = candidate_axes[0]
else:
raise ValueError(
f"test='{test_name}' requires group_axis/group_dim when more than one "
"non-time dimension is present."
)
if resolved == time_axis:
raise ValueError(
"group_axis/group_dim must be different from the analyzed time axis."
)
return resolved
def _vectorized_grouped_mk_test(
data_chunk: np.ndarray,
alpha: float = 0.05,
method: str = "theilslopes",
test: str = "multivariate",
) -> Dict[str, np.ndarray]:
"""Vectorized grouped MK test for data shaped as (time, group, series)."""
test_name = _normalize_test_name(test)
if test_name not in _GROUPED_TESTS:
raise ValueError(f"Unsupported grouped test: {test_name}")
if method not in {"theilslopes", "linregress"}:
raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'")
time_steps, n_groups, n_series = data_chunk.shape
grouped_x = _grouped_time_index(time_steps, n_groups)
results = {
"trend": np.full(n_series, np.nan),
"h": np.zeros(n_series, dtype=bool),
"p": np.full(n_series, np.nan),
"z": np.full(n_series, np.nan),
"tau": np.full(n_series, np.nan),
"std_error": np.full(n_series, np.nan),
}
finite_counts = np.sum(np.isfinite(data_chunk), axis=(0, 1))
valid_series = finite_counts >= 3
if not np.any(valid_series):
return results
valid_indices = np.where(valid_series)[0]
no_nan_mask = np.all(np.isfinite(data_chunk[:, :, valid_indices]), axis=(0, 1))
no_nan_indices = valid_indices[no_nan_mask]
if no_nan_indices.size > 0:
clean_data = np.asarray(data_chunk[:, :, no_nan_indices], dtype=np.float64)
if method == "theilslopes":
flat_clean = np.ascontiguousarray(clean_data).reshape(
time_steps * n_groups, clean_data.shape[2]
)
slopes = _grouped_sen_slope_batch(flat_clean, n_groups)
intercepts = (
np.median(flat_clean, axis=0)
- (0.5 * (flat_clean.shape[0] - 1) / n_groups) * slopes
)
fitted = (
grouped_x[:, np.newaxis] * slopes[np.newaxis, :]
+ intercepts[np.newaxis, :]
)
std_errors = np.std(flat_clean - fitted, axis=0) / np.sqrt(
flat_clean.shape[0]
)
else:
flattened = clean_data.reshape(time_steps * n_groups, clean_data.shape[2])
slopes = _linregress_slope_batch(flattened, grouped_x)
x_mean = np.mean(grouped_x)
y_mean = np.mean(flattened, axis=0)
intercepts = y_mean - slopes * x_mean
std_errors = _linregress_std_error_batch(flattened, grouped_x, slopes)
if test_name == "correlated_multivariate":
S_values, var_s_values, grouped_denom = _grouped_correlated_stats_batch(
flat_clean if method == "theilslopes" else flattened, n_groups
)
with np.errstate(divide="ignore", invalid="ignore"):
z_values = S_values / np.sqrt(var_s_values)
else:
S_values, var_s_values, grouped_denom = _multivariate_score_variance_batch(
clean_data
)
z_values = np.zeros_like(S_values, dtype=np.float64)
positive_mask = S_values > 0
negative_mask = S_values < 0
z_values[positive_mask] = (S_values[positive_mask] - 1) / np.sqrt(
var_s_values[positive_mask]
)
z_values[negative_mask] = (S_values[negative_mask] + 1) / np.sqrt(
var_s_values[negative_mask]
)
p_values = 2 * (1 - stats.norm.cdf(np.abs(z_values)))
h_values = np.abs(z_values) > stats.norm.ppf(1 - alpha / 2)
tau_values = (
S_values / grouped_denom
if grouped_denom != 0.0
else np.full_like(S_values, np.nan)
)
results["trend"][no_nan_indices] = slopes
results["h"][no_nan_indices] = h_values
results["p"][no_nan_indices] = p_values
results["z"][no_nan_indices] = z_values
results["tau"][no_nan_indices] = tau_values
results["std_error"][no_nan_indices] = std_errors
nan_indices = valid_indices[~no_nan_mask]
for series_idx in nan_indices:
try:
mk_result = mann_kendall_test(
np.asarray(data_chunk[:, :, series_idx], dtype=np.float64),
alpha=alpha,
method=method,
test=test_name,
)
except Exception:
continue
for key, value in mk_result.items():
results[key][series_idx] = value
return results
[docs]
def mann_kendall_multidim(
data: np.ndarray,
axis: Union[int, str] = 0,
alpha: float = 0.05,
method: str = "theilslopes",
chunk_size: Optional[int] = None,
dim: Optional[Union[int, str]] = None,
test: str = "original",
period: Optional[int] = None,
lag: Optional[int] = None,
group_axis: Optional[Union[int, str]] = None,
) -> Dict[str, np.ndarray]:
"""
Optimized numpy-based Mann-Kendall test for multidimensional arrays.
This implementation is typically faster than xarray-based versions for large arrays
because it avoids xarray overhead and uses optimized numpy operations.
Supports various input dimensions:
- 1D: (time,) - single time series
- 2D: (time, ensemble) or (ensemble, time) - ensemble time series
- 3D: (time, lat, lon) - spatial grid
- ND: arbitrary multidimensional arrays
Parameters
----------
data : np.ndarray
Input array with time series along one axis
axis : int or str, default 0
Axis along which to compute trends. Can be integer index or dimension name.
For numpy arrays with string names, the array must have named axes.
alpha : float, default 0.05
Significance level
method : str, default 'theilslopes'
Slope calculation method ('theilslopes' or 'linregress')
chunk_size : int, optional
Process data in chunks to manage memory usage
dim : int or str, optional
Alternative name for axis parameter (XArray style). Takes precedence over axis.
test : str, default 'original'
Mann-Kendall test family to apply:
- 'original': original Mann-Kendall test
- 'yue_wang': Yue and Wang (2004) modified variance correction
- 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test
- 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test
- 'correlated_multivariate': Libiseller-Grimvall correlated multivariate MK test
- 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test
- 'regional': Helsel regional MK test
- 'hamed_rao': Hamed and Rao (1998) variance correction
- 'pre_whitening': Yue and Wang (2002) pre-whitening modification
- 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening
period : int, optional
Seasonal cycle length used by ``test="seasonal"`` and
``test="correlated_seasonal"``. When omitted, these seasonal test
families default to ``12``. Other test families ignore this argument.
lag : int, optional
Number of autocorrelation lags to include for ``test="yue_wang"``
and ``test="hamed_rao"``. ``None`` uses all available lags. This
argument has no effect for other test families.
group_axis : int or str, optional
Grouping axis used by grouped test families such as
``test="multivariate"``, ``test="regional"``, and
``test="correlated_multivariate"``. If omitted, it is inferred only
when there is exactly one non-time axis.
Returns
-------
result : dict
Dictionary with result arrays for each statistic:
- 'trend': slope values
- 'h': significance test results (boolean)
- 'p': p-values
- 'z': z-scores
- 'tau': Kendall's tau values
- 'std_error': standard errors
Notes
-----
Parameter priority: dim > axis
Both axis and dim can accept integer indices or string dimension names.
For numpy arrays, string names require the array to have a 'dims' attribute or similar.
"""
test_name = _normalize_test_name(test)
effective_period = _resolve_seasonal_period(test_name, period)
def _resolve_axis(arr: Any, axis_param: Union[int, str]) -> int:
"""Resolve axis parameter to integer index."""
if isinstance(axis_param, str):
# For numpy arrays, we need to check if it has dimension names
if hasattr(arr, "dims"):
# XArray-like object
return arr.get_axis_num(axis_param)
elif hasattr(arr, "axis_names") and arr.axis_names:
# Custom numpy array with axis names
return arr.axis_names.index(axis_param)
else:
# Plain numpy array - assume common dimension names
common_time_names = [
"time",
"t",
"year",
"years",
"month",
"months",
"day",
"days",
"hour",
"hours",
]
if axis_param.lower() in common_time_names:
# Assume time is the first dimension by default
return 0
else:
raise ValueError(
f"Cannot resolve string axis '{axis_param}' for numpy array without dimension names. "
f"Use integer index or ensure array has 'dims' attribute."
)
else:
return axis_param
# Handle parameter aliases with priority: dim > axis
if dim is not None:
actual_time_axis = _resolve_axis(data, dim)
else:
actual_time_axis = _resolve_axis(data, axis)
actual_group_axis = _resolve_group_axis(
data, group_axis=group_axis, time_axis=actual_time_axis, test_name=test_name
)
# Handle 1D input
if data.ndim == 1:
return mann_kendall_test(
data,
alpha=alpha,
method=method,
test=test_name,
period=effective_period,
lag=lag,
)
if actual_group_axis is not None:
grouped = np.moveaxis(data, (actual_time_axis, actual_group_axis), (0, 1))
time_steps = grouped.shape[0]
group_size = grouped.shape[1]
spatial_shape = grouped.shape[2:]
data_3d = grouped.reshape(time_steps, group_size, -1)
else:
data = np.moveaxis(data, actual_time_axis, 0)
time_steps = data.shape[0]
if data.ndim == 2:
data_2d = data
spatial_shape = (data.shape[1],)
else:
spatial_shape = data.shape[1:]
data_2d = data.reshape(time_steps, -1)
n_points = data_3d.shape[2] if actual_group_axis is not None else data_2d.shape[1]
# Initialize output arrays
results = {
"trend": np.full(spatial_shape, np.nan),
"h": np.zeros(spatial_shape, dtype=bool),
"p": np.full(spatial_shape, np.nan),
"z": np.full(spatial_shape, np.nan),
"tau": np.full(spatial_shape, np.nan),
"std_error": np.full(spatial_shape, np.nan),
}
# Flatten result arrays for efficient assignment
results_flat = {key: val.ravel() for key, val in results.items()}
# Determine chunk size
if chunk_size is None:
# Estimate memory usage and set reasonable chunk size
max_memory_mb = 200 # 200MB limit
bytes_per_element = 8
max_chunk_size = (max_memory_mb * 1024 * 1024) // (
time_steps * time_steps * bytes_per_element
)
chunk_size = min(max_chunk_size, n_points, 10000)
chunk_size = max(chunk_size, 100) # Minimum chunk size
# Process in chunks
for start_idx in range(0, n_points, chunk_size):
end_idx = min(start_idx + chunk_size, n_points)
if actual_group_axis is not None:
chunk_data = data_3d[:, :, start_idx:end_idx]
chunk_results = _vectorized_grouped_mk_test(
chunk_data,
alpha=alpha,
method=method,
test=test_name,
)
else:
chunk_data = data_2d[:, start_idx:end_idx]
chunk_results = _vectorized_mk_test(
chunk_data,
alpha=alpha,
method=method,
test=test_name,
period=effective_period,
lag=lag,
)
# Store results
for key, values in chunk_results.items():
results_flat[key][start_idx:end_idx] = values
# Reshape results back to original spatial shape
for key in results:
results[key] = results_flat[key].reshape(spatial_shape)
return results
def _vectorized_mk_test(
data_chunk: np.ndarray,
alpha: float = 0.05,
method: str = "theilslopes",
test: str = "original",
period: Optional[int] = None,
lag: Optional[int] = None,
) -> Dict[str, np.ndarray]:
"""
Truly vectorized Mann-Kendall test for a chunk of time series.
Uses optimized numpy operations to process multiple time series simultaneously.
This is a major performance improvement over the previous loop-based implementation.
Handles missing data (NaN) gracefully by:
1. Full vectorization for series without NaN
2. Individual processing for series with NaN
3. Automatic skipping of series with insufficient data
"""
test_name = _normalize_test_name(test)
effective_period = _resolve_seasonal_period(test_name, period)
if method not in {"theilslopes", "linregress"}:
raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'")
time_steps, n_series = data_chunk.shape
x = np.arange(time_steps, dtype=np.float64)
# Initialize result arrays
results = {
"trend": np.full(n_series, np.nan),
"h": np.zeros(n_series, dtype=bool),
"p": np.full(n_series, np.nan),
"z": np.full(n_series, np.nan),
"tau": np.full(n_series, np.nan),
"std_error": np.full(n_series, np.nan),
}
# Find valid time series (with enough non-NaN values)
valid_counts = np.sum(np.isfinite(data_chunk), axis=0)
valid_series = valid_counts >= 3
if not np.any(valid_series):
return results
# Get valid indices for efficient assignment
valid_indices = np.where(valid_series)[0]
# Separate series with complete data from those with missing values
no_nan_mask = valid_counts[valid_series] == time_steps
no_nan_indices = valid_indices[no_nan_mask]
# Process series without NaN using full vectorization.
# NumPy returns a scalar directly; Dask returns a deferred scalar.
n_clean_series = no_nan_mask.sum()
if hasattr(n_clean_series, "compute"):
n_clean_series = n_clean_series.compute()
if n_clean_series > 0:
if hasattr(data_chunk, "compute"):
clean_data_computed = data_chunk[:, no_nan_indices].compute()
else:
clean_data_computed = np.asarray(data_chunk[:, no_nan_indices])
clean_data_computed = np.asarray(clean_data_computed, dtype=np.float64)
if test_name in {"seasonal", "correlated_seasonal"}:
core_input = _as_core_input_2d(clean_data_computed)
if method == "theilslopes":
slopes = _grouped_sen_slope_batch(core_input, effective_period)
intercepts = (
np.median(clean_data_computed, axis=0)
- (0.5 * (clean_data_computed.shape[0] - 1) / effective_period)
* slopes
)
else:
slopes = _linregress_slope_batch(clean_data_computed, x)
x_mean = np.mean(x)
y_mean = np.mean(clean_data_computed, axis=0)
intercepts = y_mean - slopes * x_mean
if test_name == "seasonal":
S_values, var_s_values, seasonal_denom = _seasonal_score_variance_batch(
core_input, effective_period
)
else:
S_values, var_s_values, seasonal_denom = (
_grouped_correlated_stats_batch(core_input, effective_period)
)
stat_n = time_steps
elif test_name == "pre_whitening":
core_input = _as_core_input_2d(clean_data_computed)
if method == "theilslopes":
slopes = _sen_slope_batch(core_input)
else:
slopes = _linregress_slope_batch(clean_data_computed, x)
stat_n = core_input.shape[0] - 1
S_values, var_s_values = _pre_whiten_score_variance_batch(core_input)
elif test_name == "trend_free_pre_whitening":
core_input = _as_core_input_2d(clean_data_computed)
if method == "theilslopes":
slopes = _sen_slope_batch(core_input)
else:
slopes = _linregress_slope_batch(clean_data_computed, x)
stat_n = core_input.shape[0] - 1
S_values, var_s_values = _trend_free_pre_whiten_score_variance_batch(
core_input, slopes
)
elif test_name == "hamed_rao":
if method == "theilslopes":
S_values, base_var_values, slopes = _score_variance_slope_batch(
clean_data_computed, modified=False
)
correction_slopes = slopes
else:
S_values, base_var_values = _score_variance_batch(
clean_data_computed, modified=False
)
slopes = _linregress_slope_batch(clean_data_computed, x)
correction_slopes = _sen_slope_batch(clean_data_computed)
var_s_values = _hamed_rao_variance_batch(
clean_data_computed,
base_var_values,
correction_slopes,
alpha=alpha,
lag=lag,
)
stat_n = time_steps
elif test_name == "yue_wang":
if lag is None and method == "theilslopes":
S_values, var_s_values, slopes = _score_variance_slope_batch(
clean_data_computed, modified=True
)
elif lag is None:
S_values, var_s_values = _score_variance_batch(
clean_data_computed, modified=True
)
slopes = _linregress_slope_batch(clean_data_computed, x)
else:
if method == "theilslopes":
S_values, base_var_values, slopes = _score_variance_slope_batch(
clean_data_computed, modified=False
)
correction_slopes = slopes
else:
S_values, base_var_values = _score_variance_batch(
clean_data_computed, modified=False
)
slopes = _linregress_slope_batch(clean_data_computed, x)
correction_slopes = _sen_slope_batch(clean_data_computed)
var_s_values = _yue_wang_variance_batch(
clean_data_computed,
base_var_values,
correction_slopes,
lag=lag,
)
stat_n = time_steps
elif method == "theilslopes":
S_values, var_s_values, slopes = _score_variance_slope_batch(
clean_data_computed, modified=False
)
stat_n = time_steps
else:
S_values, var_s_values = _score_variance_batch(
clean_data_computed, modified=False
)
stat_n = time_steps
# Vectorized z-score calculation
z_values = np.zeros_like(S_values, dtype=np.float64)
if test_name == "correlated_seasonal":
with np.errstate(divide="ignore", invalid="ignore"):
z_values = S_values / np.sqrt(var_s_values)
else:
positive_mask = S_values > 0
negative_mask = S_values < 0
z_values[positive_mask] = (S_values[positive_mask] - 1) / np.sqrt(
var_s_values[positive_mask]
)
z_values[negative_mask] = (S_values[negative_mask] + 1) / np.sqrt(
var_s_values[negative_mask]
)
p_values = 2 * (1 - stats.norm.cdf(np.abs(z_values)))
h_values = np.abs(z_values) > stats.norm.ppf(1 - alpha / 2)
if (
test_name
not in {
"pre_whitening",
"trend_free_pre_whitening",
"hamed_rao",
"yue_wang",
}
and method != "theilslopes"
):
slopes = _linregress_slope_batch(clean_data_computed, x)
if test_name in {"seasonal", "correlated_seasonal"}:
tau_values = (
S_values / seasonal_denom
if seasonal_denom != 0.0
else np.full_like(S_values, np.nan)
)
else:
tau_values = S_values / (0.5 * stat_n * (stat_n - 1))
if test_name in {"seasonal", "correlated_seasonal"} and method == "theilslopes":
seasonal_x = x[:, np.newaxis] / effective_period
fitted = seasonal_x * slopes[np.newaxis, :] + intercepts[np.newaxis, :]
std_errors = np.std(clean_data_computed - fitted, axis=0) / np.sqrt(
clean_data_computed.shape[0]
)
elif method == "theilslopes":
std_errors = _theil_std_error_batch(clean_data_computed, x, slopes)
else:
std_errors = _linregress_std_error_batch(clean_data_computed, x, slopes)
results["trend"][no_nan_indices] = slopes
results["h"][no_nan_indices] = h_values
results["p"][no_nan_indices] = p_values
results["z"][no_nan_indices] = z_values
results["tau"][no_nan_indices] = tau_values
results["std_error"][no_nan_indices] = std_errors
# Handle series with NaN individually (but batch them for efficiency)
nan_mask = valid_counts[valid_indices] != time_steps
nan_indices = valid_indices[nan_mask]
# Convert to regular numpy if it's a Dask array to avoid indexing issues
if hasattr(data_chunk, "compute"):
data_for_nan = data_chunk.compute()
# Also convert indices to regular numpy for iteration
if hasattr(nan_indices, "compute"):
nan_indices = nan_indices.compute()
else:
data_for_nan = data_chunk
for series_idx in nan_indices:
y = data_for_nan[:, series_idx]
finite_mask = np.isfinite(y)
y_clean = y[finite_mask]
# Mann-Kendall test for individual series with NaN
try:
mk_result = mann_kendall_test(
y if test_name == "seasonal" else y_clean,
alpha=alpha,
method=method,
test=test_name,
period=effective_period,
lag=lag,
)
# Store results
for key, value in mk_result.items():
results[key][series_idx] = value
except Exception:
# Skip problematic series - results remain NaN
continue
return results
[docs]
def mann_kendall_xarray(
data, # xr.DataArray
dim: str = "time",
alpha: float = 0.05,
method: str = "theilslopes",
use_dask: bool = True,
test: str = "original",
period: Optional[int] = None,
lag: Optional[int] = None,
group_dim: Optional[str] = None,
): # -> xr.Dataset
"""
Mann-Kendall test for xarray DataArray with intelligent dimension handling.
Automatically handles:
- 1D time series (pure time dimension)
- Multi-dimensional data with preserved dimension order
- Scalar dimensions (size=1) from sel() operations
- Correct dimension ordering in output
Parameters
----------
data : xr.DataArray
Input data array
dim : str, default 'time'
Time dimension name to analyze along
alpha : float, default 0.05
Significance level
method : str, default 'theilslopes'
Slope calculation method
use_dask : bool, default True
Use dask for computation if available
test : str, default 'original'
Mann-Kendall test family to apply:
- 'original': original Mann-Kendall test
- 'yue_wang': Yue and Wang (2004) modified variance correction
- 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test
- 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test
- 'correlated_multivariate': Libiseller-Grimvall correlated multivariate MK test
- 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test
- 'regional': Helsel regional MK test
- 'hamed_rao': Hamed and Rao (1998) variance correction
- 'pre_whitening': Yue and Wang (2002) pre-whitening modification
- 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening
period : int, optional
Seasonal cycle length used by ``test="seasonal"`` and
``test="correlated_seasonal"``. When omitted, these seasonal test
families default to ``12``. Other test families ignore this argument.
lag : int, optional
Number of autocorrelation lags to include for ``test="yue_wang"``
and ``test="hamed_rao"``. ``None`` uses all available lags. This
argument has no effect for other test families.
group_dim : str, optional
Grouping dimension used by ``test="multivariate"``,
``test="regional"``, and ``test="correlated_multivariate"``. If
omitted, it is inferred only when there is exactly one non-time
dimension.
Returns
-------
result : xr.Dataset
Dataset containing trend analysis results with preserved dimension order
Examples
--------
>>> # 3D data (time, lat, lon) -> (lat, lon)
>>> ds = mann_kendall_xarray(data_3d, dim='time')
>>>
>>> # 3D data (year, ensemble, lat) -> (ensemble, lat)
>>> ds = mann_kendall_xarray(data_3d, dim='year')
>>>
>>> # Handle scalar dimension from sel
>>> subset = data.sel(lat=0) # lat becomes scalar
>>> ds = mann_kendall_xarray(subset, dim='time') # Works correctly
"""
# Get time axis
time_axis = data.get_axis_num(dim)
test_name = _normalize_test_name(test)
effective_period = _resolve_seasonal_period(test_name, period)
group_axis = _resolve_group_axis(
data, group_axis=group_dim, time_axis=time_axis, test_name=test_name
)
if use_dask and hasattr(data.data, "chunks") and group_axis is None:
# Use dask for computation
results = _dask_mann_kendall(
data, dim, alpha, method, test, effective_period, lag
)
else:
# Use numpy implementation
results = mann_kendall_multidim(
data.values,
axis=time_axis,
alpha=alpha,
method=method,
test=test,
period=effective_period,
lag=lag,
group_axis=group_axis,
)
# Smart dimension handling: preserve order and filter scalar dimensions
# Get all dimensions except the analyzed dimension, in original order
output_dims = []
output_coords = {}
for d in data.dims:
if d == dim:
# Skip the analyzed dimension
continue
if group_dim is not None and d == group_dim:
continue
if group_axis is not None and d == data.dims[group_axis]:
continue
# Check if this is a true dimension (size > 1) or scalar (size == 1)
if data.sizes[d] > 1:
output_dims.append(d)
output_coords[d] = data.coords[d]
# Scalar dimensions (size == 1) are excluded from output
# Handle different cases based on output dimensions
if not output_dims:
# Pure scalar output: no non-time dimensions or all are size=1
# Return scalar dataset
ds = xr.Dataset(
data_vars={
"trend": (
[],
(
results["trend"]
if np.isscalar(results["trend"])
else results["trend"].item()
),
),
"h": (
[],
results["h"] if np.isscalar(results["h"]) else results["h"].item(),
),
"p": (
[],
results["p"] if np.isscalar(results["p"]) else results["p"].item(),
),
"z": (
[],
results["z"] if np.isscalar(results["z"]) else results["z"].item(),
),
"tau": (
[],
(
results["tau"]
if np.isscalar(results["tau"])
else results["tau"].item()
),
),
"std_error": (
[],
(
results["std_error"]
if np.isscalar(results["std_error"])
else results["std_error"].item()
),
),
},
coords={},
attrs={
"title": "Mann-Kendall Trend Analysis",
"alpha": alpha,
"method": method,
"test": test_name,
"input_dims": str(data.dims),
"analyzed_dim": dim,
"group_dim": data.dims[group_axis] if group_axis is not None else "",
},
)
else:
# Multi-dimensional output: preserve original dimension order
ds = xr.Dataset(
data_vars={
"trend": (output_dims, results["trend"]),
"h": (output_dims, results["h"]),
"p": (output_dims, results["p"]),
"z": (output_dims, results["z"]),
"tau": (output_dims, results["tau"]),
"std_error": (output_dims, results["std_error"]),
},
coords=output_coords,
attrs={
"title": "Mann-Kendall Trend Analysis",
"alpha": alpha,
"method": method,
"test": test_name,
"input_dims": str(data.dims),
"analyzed_dim": dim,
"group_dim": data.dims[group_axis] if group_axis is not None else "",
},
)
return ds
def _dask_mann_kendall(
data,
dim: str,
alpha: float,
method: str,
test: str = "original", # xr.DataArray
period: Optional[int] = None,
lag: Optional[int] = None,
): # -> Dict[str, np.ndarray]
"""Use dask map_blocks for Mann-Kendall computation."""
if _normalize_test_name(test) in _GROUPED_TESTS:
raise ValueError(
"Grouped Mann-Kendall tests are currently dispatched through the NumPy "
"path. Use mann_kendall_xarray(..., group_dim=...) instead."
)
# Get time axis
time_axis = data.get_axis_num(dim)
# Convert to dask array if not already
if hasattr(data, "data") and isinstance(data.data, da.Array):
dask_data = data.data
else:
# Create dask array with reasonable chunking
chunks = list(data.shape)
chunks[time_axis] = -1 # Keep time dimension unchunked
# Chunk spatial dimensions reasonably
for i, size in enumerate(chunks):
if i != time_axis and size > 100:
chunks[i] = min(50, size // 2)
dask_data = da.from_array(data.values, chunks=chunks)
def mk_block_processor(block):
"""Process a block of data with Mann-Kendall analysis."""
# Move time axis to front
block = np.moveaxis(block, time_axis, 0)
time_steps = block.shape[0]
# Get spatial shape and flatten
spatial_shape = block.shape[1:]
spatial_size = np.prod(spatial_shape)
block_2d = block.reshape(time_steps, spatial_size)
# Initialize output arrays
output_shape = (6,) + spatial_shape # 6 output variables
output = np.full(output_shape, np.nan)
output_2d = output.reshape(6, spatial_size)
# Process each spatial point
for i in range(spatial_size):
series = block_2d[:, i]
# Skip if insufficient data
if len(series) < 3 or np.sum(np.isfinite(series)) < 3:
continue
try:
# Perform Mann-Kendall test
result = mann_kendall_test(
series,
alpha=alpha,
method=method,
test=test,
period=period,
lag=lag,
)
# Store results: [trend, h, p, z, tau, std_error]
output_2d[0, i] = result["trend"]
output_2d[1, i] = float(result["h"])
output_2d[2, i] = result["p"]
output_2d[3, i] = result["z"]
output_2d[4, i] = result["tau"]
output_2d[5, i] = result["std_error"]
except Exception:
continue # Skip problematic series
return output.reshape(output_shape)
# Determine output chunks
output_chunks = list(dask_data.chunks)
output_chunks[time_axis] = (6,) # Replace time with 6 output variables
# Apply Mann-Kendall analysis using map_blocks
with ProgressBar():
result_array = da.map_blocks(
mk_block_processor,
dask_data,
dtype=float,
chunks=output_chunks,
drop_axis=time_axis,
new_axis=time_axis,
).compute()
# Move result axis to front and extract variables
result_array = np.moveaxis(result_array, time_axis, 0)
# Create result dictionary
result_dict = {
"trend": result_array[0],
"h": result_array[1].astype(bool),
"p": result_array[2],
"z": result_array[3],
"tau": result_array[4],
"std_error": result_array[5],
}
return result_dict
# Convenience function for easy access
[docs]
def trend_analysis(
data: Any,
axis: Union[int, str] = 0,
alpha: float = 0.05,
method: str = "theilslopes",
dim: Optional[Union[int, str]] = None,
test: str = "original",
period: Optional[int] = None,
lag: Optional[int] = None,
group_axis: Optional[Union[int, str]] = None,
group_dim: Optional[str] = None,
**kwargs,
) -> Any:
"""
Unified interface for Mann-Kendall trend analysis.
Automatically chooses the best implementation based on input type.
Parameters
----------
data : np.ndarray or xr.DataArray
Input data array
axis : int or str, default 0
Axis along which to compute trends. Can be integer index or dimension name.
alpha : float, default 0.05
Significance level
method : str, default 'theilslopes'
Slope calculation method
dim : int or str, optional
Alternative name for axis parameter (XArray style). Takes precedence over axis.
test : str, default 'original'
Mann-Kendall test family to apply:
- 'original': original Mann-Kendall test
- 'yue_wang': Yue and Wang (2004) modified variance correction
- 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test
- 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test
- 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test
- 'regional': Helsel regional MK test
- 'hamed_rao': Hamed and Rao (1998) variance correction
- 'pre_whitening': Yue and Wang (2002) pre-whitening modification
- 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening
period : int, optional
Seasonal cycle length used by ``test="seasonal"`` and
``test="correlated_seasonal"``. When omitted, these seasonal test
families default to ``12``. Other test families ignore this argument.
lag : int, optional
Number of autocorrelation lags to include for ``test="yue_wang"``
and ``test="hamed_rao"``. ``None`` uses all available lags. This
argument has no effect for other test families.
group_axis : int or str, optional
Grouping axis used by ``test="multivariate"`` and ``test="regional"``
for NumPy inputs.
group_dim : str, optional
Grouping dimension used by ``test="multivariate"`` and
``test="regional"`` for xarray inputs.
**kwargs
Additional arguments passed to underlying functions
Notes
-----
Parameter priority: dim > axis
Both axis and dim support integer indices and string dimension names.
"""
# Handle parameter aliases with priority: dim > axis
if dim is not None:
actual_axis = dim
else:
actual_axis = axis
if isinstance(data, xr.DataArray):
return mann_kendall_xarray(
data,
dim=actual_axis,
alpha=alpha,
method=method,
test=test,
period=period,
lag=lag,
group_dim=group_dim,
**kwargs,
)
else:
return mann_kendall_multidim(
data,
axis=actual_axis,
alpha=alpha,
method=method,
test=test,
period=period,
lag=lag,
group_axis=group_axis,
**kwargs,
)
# Aliases for backward compatibility and convenience
mk_test = mann_kendall_test # Short alias
mk_multidim = mann_kendall_multidim # Short alias for multidimensional