Source code for skyborn.calc.mann_kendall.core

"""
Mann-Kendall trend analysis for Skyborn package.

This module provides optimized implementations of the Mann-Kendall test
for trend detection in time series data, supporting both xarray and numpy arrays.

Examples
--------
Basic usage with 1D time series:

>>> import numpy as np
>>> from skyborn.calc import mann_kendall_test
>>>
>>> # Create time series with trend
>>> x = np.arange(100)
>>> y = 0.5 * x + np.random.randn(100)
>>>
>>> # Perform Mann-Kendall test
>>> result = mann_kendall_test(y)
>>> print(f"Trend: {result['trend']:.3f}")
>>> print(f"Significant: {result['h']}")

Multidimensional trend analysis:

>>> import numpy as np
>>> from skyborn.calc import trend_analysis
>>>
>>> # Create 3D data (time, lat, lon)
>>> data = np.random.randn(100, 50, 80) + np.linspace(0, 2, 100)[:, None, None]
>>>
>>> # Analyze trends along time axis (axis 0)
>>> results = trend_analysis(data, axis=0)
>>> print(f"Trend shape: {results['trend'].shape}")  # (50, 80)

Flexible dimension specification with xarray:

>>> import xarray as xr
>>> from skyborn.calc import trend_analysis
>>>
>>> # Create sample data
>>> time = pd.date_range('2000', periods=120, freq='M')
>>> data = xr.DataArray(
...     np.random.randn(120, 10, 15) + np.linspace(0, 3, 120)[:, None, None],
...     coords={'time': time, 'lat': range(10), 'lon': range(15)},
...     dims=['time', 'lat', 'lon']
... )
>>>
>>> # Analyze trends along any dimension
>>> results = trend_analysis(data, dim='time')
>>> print(results.trend.values.shape)  # (10, 15)
"""

import sys
import warnings
from importlib import import_module
from importlib import util as importlib_util
from importlib.machinery import EXTENSION_SUFFIXES
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union

import dask.array as da
import numpy as np
import scipy.stats as stats
import xarray as xr
from dask.diagnostics import ProgressBar

from ._compiled import (
    _as_core_input_2d,
    _load_core_module_from,
    _require_kernel,
    _resolve_kernel_handles,
)
from .regression import (
    _linregress_slope_batch,
    _linregress_std_error_batch,
    _theil_std_error_batch,
)
from .variants import (
    _autocorr_lag_limit,
    _correlated_multivariate_stats_scalar,
    _correlated_seasonal_stats_scalar,
    _grouped_time_index,
    _hamed_rao_variance_batch,
    _multivariate_score_variance_batch,
    _multivariate_score_variance_scalar,
    _multivariate_sens_slope_scalar,
    _pre_whiten_score_variance_batch,
    _seasonal_score_variance_batch,
    _seasonal_score_variance_scalar,
    _seasonal_sens_slope_scalar,
    _trend_free_pre_whiten_score_variance_batch,
    _yue_wang_variance_batch,
)

__all__ = [
    "mann_kendall_test",
    "mann_kendall_multidim",
    "mann_kendall_xarray",
    "trend_analysis",
    "mk_test",
    "mk_multidim",
]


_GROUPED_TESTS = {"multivariate", "regional", "correlated_multivariate"}


def _normalize_test_name(test: str) -> str:
    """Normalize public MK test aliases to one canonical label."""
    normalized = test.lower().replace("-", "_").replace(" ", "_")
    if normalized in {"original", "mk", "default"}:
        return "original"
    if normalized in {"yue_wang", "yuewang"}:
        return "yue_wang"
    if normalized in {"seasonal", "seasonal_mk"}:
        return "seasonal"
    if normalized in {"correlated_seasonal", "correlatedseasonal"}:
        return "correlated_seasonal"
    if normalized in {"correlated_multivariate", "correlatedmultivariate"}:
        return "correlated_multivariate"
    if normalized in {"multivariate", "multivar"}:
        return "multivariate"
    if normalized in {"regional", "regional_mk"}:
        return "regional"
    if normalized in {"hamed_rao", "hamedrao"}:
        return "hamed_rao"
    if normalized in {
        "trend_free_pre_whitening",
        "trend_free_prewhitening",
        "tfpw",
    }:
        return "trend_free_pre_whitening"
    if normalized in {"pre_whitening", "prewhitening", "pre_whiten"}:
        return "pre_whitening"
    raise ValueError(
        "Unknown test: "
        f"{test}. Supported tests are 'original', 'yue_wang', 'seasonal', "
        "'correlated_seasonal', 'correlated_multivariate', "
        "'multivariate', 'regional', "
        "'hamed_rao', 'pre_whitening', and "
        "'trend_free_pre_whitening'."
    )


def _resolve_seasonal_period(test_name: str, period: Optional[int]) -> Optional[int]:
    """Return the effective seasonal cycle length for the active test family."""
    if test_name in {"seasonal", "correlated_seasonal"}:
        return 12 if period is None else int(period)
    return None


def _load_core_module():
    """Best-effort loader for the compiled Mann-Kendall core extension."""
    return _load_core_module_from(
        file_path=__file__,
        package_name=__package__,
        extension_suffixes=EXTENSION_SUFFIXES,
        path_type=Path,
        importlib_util_module=importlib_util,
        import_module_func=import_module,
        sys_modules=sys.modules,
    )


_core_module = _load_core_module()
_kernel_handles = _resolve_kernel_handles(_core_module)
_score_variance_kernel = _kernel_handles["_score_variance_kernel"]
_score_variance_slope_kernel = _kernel_handles["_score_variance_slope_kernel"]
_yue_wang_slope_kernel = _kernel_handles["_yue_wang_slope_kernel"]
_hamed_rao_slope_kernel = _kernel_handles["_hamed_rao_slope_kernel"]
_sen_slope_kernel = _kernel_handles["_sen_slope_kernel"]
_grouped_sen_slope_kernel = _kernel_handles["_grouped_sen_slope_kernel"]
_grouped_correlated_stats_kernel = _kernel_handles["_grouped_correlated_stats_kernel"]


def _kernels_ready() -> bool:
    """Return whether the compiled Mann-Kendall kernels are available."""
    return (
        _score_variance_kernel is not None
        and _score_variance_slope_kernel is not None
        and _sen_slope_kernel is not None
        and _grouped_sen_slope_kernel is not None
        and _grouped_correlated_stats_kernel is not None
    )


def _score_variance_batch(
    data_2d: np.ndarray, modified: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
    """Run the compiled 2D score/variance kernel."""
    kernel = _require_kernel(_score_variance_kernel, "mk_score_var_batch")
    s_values, var_values = kernel(_as_core_input_2d(data_2d), int(modified))
    return np.asarray(s_values, dtype=np.float64), np.asarray(
        var_values, dtype=np.float64
    )


def _sen_slope_batch(data_2d: np.ndarray) -> np.ndarray:
    """Run the compiled 2D Theil-Sen slope kernel."""
    kernel = _require_kernel(_sen_slope_kernel, "sen_slope_batch")
    slopes = kernel(_as_core_input_2d(data_2d))
    return np.asarray(slopes, dtype=np.float64)


def _grouped_sen_slope_batch(data_2d: np.ndarray, period: int) -> np.ndarray:
    """Run the compiled grouped 2D Theil-Sen slope kernel."""
    kernel = _require_kernel(_grouped_sen_slope_kernel, "grouped_sen_slope_batch")
    slopes = kernel(_as_core_input_2d(data_2d), int(period))
    return np.asarray(slopes, dtype=np.float64)


def _grouped_correlated_stats_batch(
    data_2d: np.ndarray, period: int
) -> Tuple[np.ndarray, np.ndarray, float]:
    """Run the compiled grouped correlated-statistics kernel."""
    kernel = _require_kernel(
        _grouped_correlated_stats_kernel, "grouped_correlated_stats_batch"
    )
    s_values, var_values, denom = kernel(_as_core_input_2d(data_2d), int(period))
    return (
        np.asarray(s_values, dtype=np.float64),
        np.asarray(var_values, dtype=np.float64),
        float(denom),
    )


def _score_variance_slope_batch(
    data_2d: np.ndarray, modified: bool = False
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Run the compiled 2D combined score/variance/slope kernel."""
    kernel = _require_kernel(_score_variance_slope_kernel, "mk_score_var_sen_batch")
    s_values, var_values, slopes = kernel(_as_core_input_2d(data_2d), int(modified))
    return (
        np.asarray(s_values, dtype=np.float64),
        np.asarray(var_values, dtype=np.float64),
        np.asarray(slopes, dtype=np.float64),
    )


def _yue_wang_score_variance_slope_batch(
    data_2d: np.ndarray, lag: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Run the compiled lag-aware Yue-Wang score/variance/slope kernel."""
    kernel = _require_kernel(_yue_wang_slope_kernel, "mk_yue_wang_score_var_sen_batch")
    s_values, var_values, slopes = kernel(_as_core_input_2d(data_2d), int(lag))
    return (
        np.asarray(s_values, dtype=np.float64),
        np.asarray(var_values, dtype=np.float64),
        np.asarray(slopes, dtype=np.float64),
    )


def _hamed_rao_score_variance_slope_batch(
    data_2d: np.ndarray, alpha: float, lag: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Run the compiled Hamed-Rao score/variance/slope kernel."""
    kernel = _require_kernel(
        _hamed_rao_slope_kernel, "mk_hamed_rao_score_var_sen_batch"
    )
    interval = float(stats.norm.ppf(1 - alpha / 2) / np.sqrt(data_2d.shape[0]))
    s_values, var_values, slopes = kernel(
        _as_core_input_2d(data_2d), interval, int(lag)
    )
    return (
        np.asarray(s_values, dtype=np.float64),
        np.asarray(var_values, dtype=np.float64),
        np.asarray(slopes, dtype=np.float64),
    )


[docs] def mann_kendall_test( data: Union[np.ndarray, "xr.DataArray"], alpha: float = 0.05, method: str = "theilslopes", test: str = "original", period: Optional[int] = None, lag: Optional[int] = None, ) -> Dict[str, Union[float, bool]]: """ Perform Mann-Kendall test for trend detection on 1D time series. The Mann-Kendall test is a nonparametric test for detecting monotonic trends in time series data. It makes no assumptions about the distribution of the data. Parameters ---------- data : array-like 1D time series data. Can be numpy array or xarray DataArray. alpha : float, default 0.05 Significance level for hypothesis testing (Type I error probability). method : str, default 'theilslopes' Method for calculating slope: - 'theilslopes': Theil-Sen slope estimator (robust, recommended) - 'linregress': Linear regression slope (faster but less robust) test : str, default 'original' Mann-Kendall test family to apply: - 'original': original Mann-Kendall test - 'yue_wang': Yue and Wang (2004) modified variance correction - 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test - 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test - 'correlated_multivariate': Libiseller-Grimvall correlated multivariate MK test - 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test - 'regional': Helsel regional MK test - 'hamed_rao': Hamed and Rao (1998) variance correction - 'pre_whitening': Yue and Wang (2002) pre-whitening modification - 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening period : int, optional Seasonal cycle length used by ``test="seasonal"`` and ``test="correlated_seasonal"``. When omitted, these seasonal test families default to ``12``. Other test families ignore this argument. lag : int, optional Number of autocorrelation lags to include for ``test="yue_wang"`` and ``test="hamed_rao"``. ``None`` uses all available lags. This argument has no effect for other test families. Returns ------- result : dict Dictionary containing trend analysis results: - 'trend' : float Slope of the trend (units per time step) - 'h' : bool Hypothesis test result. True if significant trend exists at the specified alpha level. - 'p' : float Two-tailed p-value of the test - 'z' : float Normalized test statistic (z-score) - 'tau' : float Kendall's tau correlation coefficient - 'std_error' : float Standard error of the detrended residuals Notes ----- The Mann-Kendall test statistic S is calculated as: .. math:: S = \\sum_{i=1}^{n-1} \\sum_{j=i+1}^{n} \\text{sign}(x_j - x_i) The test assumes that: - Data are independent, unless a serial-correlation-aware `test=` family is used - Data come from the same distribution - Missing values are handled by removal References ---------- Mann, H. B. (1945). Nonparametric tests against trend. Econometrica, 13(3), 245-259. Kendall, M. G. (1948). Rank correlation methods. Griffin, London. Yue, S., & Wang, C. (2004). The Mann-Kendall test modified by effective sample size to detect trend in serially correlated hydrological series. Water resources management, 18(3), 201-218. Examples -------- >>> import numpy as np >>> from skyborn.calc import mann_kendall_test >>> >>> # Generate trend data >>> t = np.arange(50) >>> data = 0.3 * t + np.random.normal(0, 0.5, 50) >>> >>> # Test for trend >>> result = mann_kendall_test(data) >>> print(f"Trend slope: {result['trend']:.3f}") >>> print(f"P-value: {result['p']:.3f}") >>> print(f"Significant trend: {result['h']}") """ test_name = _normalize_test_name(test) effective_period = _resolve_seasonal_period(test_name, period) if hasattr(data, "values"): values = data.values else: values = np.asarray(data) if test_name in _GROUPED_TESTS: grouped_values = np.asarray(values, dtype=np.float64) if grouped_values.ndim == 1: grouped_values = grouped_values.reshape(-1, 1) elif grouped_values.ndim != 2: raise ValueError( "Grouped Mann-Kendall tests require 1D or 2D data with shape " "(time, group). Use trend_analysis for multidimensional data." ) finite_mask = np.isfinite(grouped_values) n_finite = int(np.sum(finite_mask)) if n_finite < 3: warnings.warn("Need at least 3 data points for Mann-Kendall test") return { "trend": np.nan, "h": False, "p": np.nan, "z": np.nan, "tau": np.nan, "std_error": np.nan, } if method not in {"theilslopes", "linregress"}: raise ValueError( f"Unknown method: {method}. Use 'theilslopes' or 'linregress'" ) if method == "theilslopes": slope, intercept = _multivariate_sens_slope_scalar(grouped_values) else: grouped_x = _grouped_time_index( grouped_values.shape[0], grouped_values.shape[1] ) flattened = grouped_values.reshape(-1) finite_flat = np.isfinite(flattened) slope, intercept = stats.linregress( grouped_x[finite_flat], flattened[finite_flat] )[:2] if test_name == "correlated_multivariate": S, var_s, denom = _correlated_multivariate_stats_scalar(grouped_values) with np.errstate(divide="ignore", invalid="ignore"): z = S / np.sqrt(var_s) else: S, var_s, denom = _multivariate_score_variance_scalar(grouped_values) if S > 0: z = (S - 1) / np.sqrt(var_s) elif S == 0: z = 0 else: z = (S + 1) / np.sqrt(var_s) p_value = 2 * (1 - stats.norm.cdf(abs(z))) h = abs(z) > stats.norm.ppf(1 - alpha / 2) tau = S / denom if denom != 0.0 else np.nan grouped_x = _grouped_time_index( grouped_values.shape[0], grouped_values.shape[1] ) flattened = grouped_values.reshape(-1) finite_flat = np.isfinite(flattened) fitted = grouped_x[finite_flat] * slope + intercept std_error = np.std(flattened[finite_flat] - fitted) / np.sqrt(n_finite) return { "trend": slope, "h": h, "p": p_value, "z": z, "tau": tau, "std_error": std_error, } if values.ndim > 1: raise ValueError( "mann_kendall_test only accepts 1D data. Use trend_analysis for multidimensional data." ) if test_name in {"seasonal", "correlated_seasonal"}: season_values = np.asarray(values, dtype=np.float64) finite_mask = np.isfinite(season_values) n_finite = int(np.sum(finite_mask)) if n_finite < 3: warnings.warn("Need at least 3 data points for Mann-Kendall test") return { "trend": np.nan, "h": False, "p": np.nan, "z": np.nan, "tau": np.nan, "std_error": np.nan, } if method not in {"theilslopes", "linregress"}: raise ValueError( f"Unknown method: {method}. Use 'theilslopes' or 'linregress'" ) if method == "theilslopes": slope, intercept = _seasonal_sens_slope_scalar( season_values, effective_period ) else: x_finite = np.arange(season_values.size, dtype=np.float64)[finite_mask] slope, intercept = stats.linregress(x_finite, season_values[finite_mask])[ :2 ] if test_name == "seasonal": S, var_s, denom = _seasonal_score_variance_scalar( season_values, effective_period ) if S > 0: z = (S - 1) / np.sqrt(var_s) elif S == 0: z = 0 else: z = (S + 1) / np.sqrt(var_s) else: S, var_s, denom = _correlated_seasonal_stats_scalar( season_values, effective_period ) with np.errstate(divide="ignore", invalid="ignore"): z = S / np.sqrt(var_s) p_value = 2 * (1 - stats.norm.cdf(abs(z))) h = abs(z) > stats.norm.ppf(1 - alpha / 2) tau = S / denom if denom != 0.0 else np.nan finite_index = np.arange(season_values.size, dtype=np.float64)[finite_mask] fitted = finite_index / effective_period * slope + intercept std_error = np.std(season_values[finite_mask] - fitted) / np.sqrt(n_finite) return { "trend": slope, "h": h, "p": p_value, "z": z, "tau": tau, "std_error": std_error, } valid_mask = np.isfinite(values) y = np.asarray(values[valid_mask], dtype=np.float64) x = np.arange(len(y), dtype=np.float64) n = len(y) if n < 3: warnings.warn("Need at least 3 data points for Mann-Kendall test") return { "trend": np.nan, "h": False, "p": np.nan, "z": np.nan, "tau": np.nan, "std_error": np.nan, } if method not in {"theilslopes", "linregress"}: raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'") if method == "theilslopes": slope = float(_sen_slope_batch(y.reshape(-1, 1))[0]) intercept = np.median(y) - slope * np.median(x) else: slope, intercept = stats.linregress(x, y)[:2] if test_name == "pre_whitening": stat_n = n - 1 s_values, var_values = _pre_whiten_score_variance_batch(y.reshape(-1, 1)) S = int(np.rint(s_values[0])) var_s = float(var_values[0]) elif test_name == "yue_wang": lag_limit = _autocorr_lag_limit(n, lag) if lag is None and method == "theilslopes": S_values, var_values, _ = _score_variance_slope_batch( y.reshape(-1, 1), modified=True ) var_s = float(var_values[0]) elif lag is None: S_values, var_values = _score_variance_batch( y.reshape(-1, 1), modified=True ) var_s = float(var_values[0]) else: S_values, var_values, correction_slopes = ( _yue_wang_score_variance_slope_batch(y.reshape(-1, 1), lag_limit) ) var_s = float(var_values[0]) if method == "theilslopes": slope = float(correction_slopes[0]) intercept = np.median(y) - slope * np.median(x) S = int(np.rint(S_values[0])) stat_n = n elif test_name == "trend_free_pre_whitening": stat_n = n - 1 s_values, var_values = _trend_free_pre_whiten_score_variance_batch( y.reshape(-1, 1), np.asarray([slope], dtype=np.float64) ) S = int(np.rint(s_values[0])) var_s = float(var_values[0]) elif test_name == "hamed_rao": lag_limit = _autocorr_lag_limit(n, lag) s_values, var_values, correction_slopes = _hamed_rao_score_variance_slope_batch( y.reshape(-1, 1), alpha=alpha, lag=lag_limit ) S = int(np.rint(s_values[0])) var_s = float(var_values[0]) if method == "theilslopes": slope = float(correction_slopes[0]) intercept = np.median(y) - slope * np.median(x) stat_n = n elif method == "theilslopes": S_values, var_values, _ = _score_variance_slope_batch( y.reshape(-1, 1), modified=False ) S = int(np.rint(S_values[0])) var_s = float(var_values[0]) stat_n = n else: S = _calculate_mk_score(y) var_s = _calculate_mk_variance(y, n, modified=False) stat_n = n if S > 0: z = (S - 1) / np.sqrt(var_s) elif S == 0: z = 0 else: z = (S + 1) / np.sqrt(var_s) p_value = 2 * (1 - stats.norm.cdf(abs(z))) h = abs(z) > stats.norm.ppf(1 - alpha / 2) tau = S / (0.5 * stat_n * (stat_n - 1)) y_detrend = y - (x * slope + intercept) std_error = np.std(y_detrend) / np.sqrt(n) return { "trend": slope, "h": h, "p": p_value, "z": z, "tau": tau, "std_error": std_error, }
def _calculate_mk_score(y: np.ndarray) -> int: """Calculate Mann-Kendall score S.""" s_values, _ = _score_variance_batch(np.asarray(y, dtype=np.float64).reshape(-1, 1)) return int(np.rint(s_values[0])) def _calculate_mk_variance(y: np.ndarray, n: int, modified: bool = False) -> float: """Calculate variance of Mann-Kendall score.""" _, variances = _score_variance_batch( np.asarray(y, dtype=np.float64).reshape(-1, 1), modified=modified ) return float(variances[0]) def _resolve_group_axis( data: Any, group_axis: Optional[Union[int, str]], time_axis: int, test_name: str, ) -> Optional[int]: """Resolve the grouping axis for grouped MK families.""" if test_name not in _GROUPED_TESTS: return None if group_axis is not None: if isinstance(group_axis, str): if hasattr(data, "get_axis_num"): resolved = data.get_axis_num(group_axis) else: raise ValueError( "String group_axis requires an array-like object with named dimensions." ) else: resolved = int(group_axis) else: ndim = data.ndim if hasattr(data, "ndim") else np.asarray(data).ndim candidate_axes = [axis for axis in range(ndim) if axis != time_axis] if len(candidate_axes) == 1: resolved = candidate_axes[0] else: raise ValueError( f"test='{test_name}' requires group_axis/group_dim when more than one " "non-time dimension is present." ) if resolved == time_axis: raise ValueError( "group_axis/group_dim must be different from the analyzed time axis." ) return resolved def _vectorized_grouped_mk_test( data_chunk: np.ndarray, alpha: float = 0.05, method: str = "theilslopes", test: str = "multivariate", ) -> Dict[str, np.ndarray]: """Vectorized grouped MK test for data shaped as (time, group, series).""" test_name = _normalize_test_name(test) if test_name not in _GROUPED_TESTS: raise ValueError(f"Unsupported grouped test: {test_name}") if method not in {"theilslopes", "linregress"}: raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'") time_steps, n_groups, n_series = data_chunk.shape grouped_x = _grouped_time_index(time_steps, n_groups) results = { "trend": np.full(n_series, np.nan), "h": np.zeros(n_series, dtype=bool), "p": np.full(n_series, np.nan), "z": np.full(n_series, np.nan), "tau": np.full(n_series, np.nan), "std_error": np.full(n_series, np.nan), } finite_counts = np.sum(np.isfinite(data_chunk), axis=(0, 1)) valid_series = finite_counts >= 3 if not np.any(valid_series): return results valid_indices = np.where(valid_series)[0] no_nan_mask = np.all(np.isfinite(data_chunk[:, :, valid_indices]), axis=(0, 1)) no_nan_indices = valid_indices[no_nan_mask] if no_nan_indices.size > 0: clean_data = np.asarray(data_chunk[:, :, no_nan_indices], dtype=np.float64) if method == "theilslopes": flat_clean = np.ascontiguousarray(clean_data).reshape( time_steps * n_groups, clean_data.shape[2] ) slopes = _grouped_sen_slope_batch(flat_clean, n_groups) intercepts = ( np.median(flat_clean, axis=0) - (0.5 * (flat_clean.shape[0] - 1) / n_groups) * slopes ) fitted = ( grouped_x[:, np.newaxis] * slopes[np.newaxis, :] + intercepts[np.newaxis, :] ) std_errors = np.std(flat_clean - fitted, axis=0) / np.sqrt( flat_clean.shape[0] ) else: flattened = clean_data.reshape(time_steps * n_groups, clean_data.shape[2]) slopes = _linregress_slope_batch(flattened, grouped_x) x_mean = np.mean(grouped_x) y_mean = np.mean(flattened, axis=0) intercepts = y_mean - slopes * x_mean std_errors = _linregress_std_error_batch(flattened, grouped_x, slopes) if test_name == "correlated_multivariate": S_values, var_s_values, grouped_denom = _grouped_correlated_stats_batch( flat_clean if method == "theilslopes" else flattened, n_groups ) with np.errstate(divide="ignore", invalid="ignore"): z_values = S_values / np.sqrt(var_s_values) else: S_values, var_s_values, grouped_denom = _multivariate_score_variance_batch( clean_data ) z_values = np.zeros_like(S_values, dtype=np.float64) positive_mask = S_values > 0 negative_mask = S_values < 0 z_values[positive_mask] = (S_values[positive_mask] - 1) / np.sqrt( var_s_values[positive_mask] ) z_values[negative_mask] = (S_values[negative_mask] + 1) / np.sqrt( var_s_values[negative_mask] ) p_values = 2 * (1 - stats.norm.cdf(np.abs(z_values))) h_values = np.abs(z_values) > stats.norm.ppf(1 - alpha / 2) tau_values = ( S_values / grouped_denom if grouped_denom != 0.0 else np.full_like(S_values, np.nan) ) results["trend"][no_nan_indices] = slopes results["h"][no_nan_indices] = h_values results["p"][no_nan_indices] = p_values results["z"][no_nan_indices] = z_values results["tau"][no_nan_indices] = tau_values results["std_error"][no_nan_indices] = std_errors nan_indices = valid_indices[~no_nan_mask] for series_idx in nan_indices: try: mk_result = mann_kendall_test( np.asarray(data_chunk[:, :, series_idx], dtype=np.float64), alpha=alpha, method=method, test=test_name, ) except Exception: continue for key, value in mk_result.items(): results[key][series_idx] = value return results
[docs] def mann_kendall_multidim( data: np.ndarray, axis: Union[int, str] = 0, alpha: float = 0.05, method: str = "theilslopes", chunk_size: Optional[int] = None, dim: Optional[Union[int, str]] = None, test: str = "original", period: Optional[int] = None, lag: Optional[int] = None, group_axis: Optional[Union[int, str]] = None, ) -> Dict[str, np.ndarray]: """ Optimized numpy-based Mann-Kendall test for multidimensional arrays. This implementation is typically faster than xarray-based versions for large arrays because it avoids xarray overhead and uses optimized numpy operations. Supports various input dimensions: - 1D: (time,) - single time series - 2D: (time, ensemble) or (ensemble, time) - ensemble time series - 3D: (time, lat, lon) - spatial grid - ND: arbitrary multidimensional arrays Parameters ---------- data : np.ndarray Input array with time series along one axis axis : int or str, default 0 Axis along which to compute trends. Can be integer index or dimension name. For numpy arrays with string names, the array must have named axes. alpha : float, default 0.05 Significance level method : str, default 'theilslopes' Slope calculation method ('theilslopes' or 'linregress') chunk_size : int, optional Process data in chunks to manage memory usage dim : int or str, optional Alternative name for axis parameter (XArray style). Takes precedence over axis. test : str, default 'original' Mann-Kendall test family to apply: - 'original': original Mann-Kendall test - 'yue_wang': Yue and Wang (2004) modified variance correction - 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test - 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test - 'correlated_multivariate': Libiseller-Grimvall correlated multivariate MK test - 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test - 'regional': Helsel regional MK test - 'hamed_rao': Hamed and Rao (1998) variance correction - 'pre_whitening': Yue and Wang (2002) pre-whitening modification - 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening period : int, optional Seasonal cycle length used by ``test="seasonal"`` and ``test="correlated_seasonal"``. When omitted, these seasonal test families default to ``12``. Other test families ignore this argument. lag : int, optional Number of autocorrelation lags to include for ``test="yue_wang"`` and ``test="hamed_rao"``. ``None`` uses all available lags. This argument has no effect for other test families. group_axis : int or str, optional Grouping axis used by grouped test families such as ``test="multivariate"``, ``test="regional"``, and ``test="correlated_multivariate"``. If omitted, it is inferred only when there is exactly one non-time axis. Returns ------- result : dict Dictionary with result arrays for each statistic: - 'trend': slope values - 'h': significance test results (boolean) - 'p': p-values - 'z': z-scores - 'tau': Kendall's tau values - 'std_error': standard errors Notes ----- Parameter priority: dim > axis Both axis and dim can accept integer indices or string dimension names. For numpy arrays, string names require the array to have a 'dims' attribute or similar. """ test_name = _normalize_test_name(test) effective_period = _resolve_seasonal_period(test_name, period) def _resolve_axis(arr: Any, axis_param: Union[int, str]) -> int: """Resolve axis parameter to integer index.""" if isinstance(axis_param, str): # For numpy arrays, we need to check if it has dimension names if hasattr(arr, "dims"): # XArray-like object return arr.get_axis_num(axis_param) elif hasattr(arr, "axis_names") and arr.axis_names: # Custom numpy array with axis names return arr.axis_names.index(axis_param) else: # Plain numpy array - assume common dimension names common_time_names = [ "time", "t", "year", "years", "month", "months", "day", "days", "hour", "hours", ] if axis_param.lower() in common_time_names: # Assume time is the first dimension by default return 0 else: raise ValueError( f"Cannot resolve string axis '{axis_param}' for numpy array without dimension names. " f"Use integer index or ensure array has 'dims' attribute." ) else: return axis_param # Handle parameter aliases with priority: dim > axis if dim is not None: actual_time_axis = _resolve_axis(data, dim) else: actual_time_axis = _resolve_axis(data, axis) actual_group_axis = _resolve_group_axis( data, group_axis=group_axis, time_axis=actual_time_axis, test_name=test_name ) # Handle 1D input if data.ndim == 1: return mann_kendall_test( data, alpha=alpha, method=method, test=test_name, period=effective_period, lag=lag, ) if actual_group_axis is not None: grouped = np.moveaxis(data, (actual_time_axis, actual_group_axis), (0, 1)) time_steps = grouped.shape[0] group_size = grouped.shape[1] spatial_shape = grouped.shape[2:] data_3d = grouped.reshape(time_steps, group_size, -1) else: data = np.moveaxis(data, actual_time_axis, 0) time_steps = data.shape[0] if data.ndim == 2: data_2d = data spatial_shape = (data.shape[1],) else: spatial_shape = data.shape[1:] data_2d = data.reshape(time_steps, -1) n_points = data_3d.shape[2] if actual_group_axis is not None else data_2d.shape[1] # Initialize output arrays results = { "trend": np.full(spatial_shape, np.nan), "h": np.zeros(spatial_shape, dtype=bool), "p": np.full(spatial_shape, np.nan), "z": np.full(spatial_shape, np.nan), "tau": np.full(spatial_shape, np.nan), "std_error": np.full(spatial_shape, np.nan), } # Flatten result arrays for efficient assignment results_flat = {key: val.ravel() for key, val in results.items()} # Determine chunk size if chunk_size is None: # Estimate memory usage and set reasonable chunk size max_memory_mb = 200 # 200MB limit bytes_per_element = 8 max_chunk_size = (max_memory_mb * 1024 * 1024) // ( time_steps * time_steps * bytes_per_element ) chunk_size = min(max_chunk_size, n_points, 10000) chunk_size = max(chunk_size, 100) # Minimum chunk size # Process in chunks for start_idx in range(0, n_points, chunk_size): end_idx = min(start_idx + chunk_size, n_points) if actual_group_axis is not None: chunk_data = data_3d[:, :, start_idx:end_idx] chunk_results = _vectorized_grouped_mk_test( chunk_data, alpha=alpha, method=method, test=test_name, ) else: chunk_data = data_2d[:, start_idx:end_idx] chunk_results = _vectorized_mk_test( chunk_data, alpha=alpha, method=method, test=test_name, period=effective_period, lag=lag, ) # Store results for key, values in chunk_results.items(): results_flat[key][start_idx:end_idx] = values # Reshape results back to original spatial shape for key in results: results[key] = results_flat[key].reshape(spatial_shape) return results
def _vectorized_mk_test( data_chunk: np.ndarray, alpha: float = 0.05, method: str = "theilslopes", test: str = "original", period: Optional[int] = None, lag: Optional[int] = None, ) -> Dict[str, np.ndarray]: """ Truly vectorized Mann-Kendall test for a chunk of time series. Uses optimized numpy operations to process multiple time series simultaneously. This is a major performance improvement over the previous loop-based implementation. Handles missing data (NaN) gracefully by: 1. Full vectorization for series without NaN 2. Individual processing for series with NaN 3. Automatic skipping of series with insufficient data """ test_name = _normalize_test_name(test) effective_period = _resolve_seasonal_period(test_name, period) if method not in {"theilslopes", "linregress"}: raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'") time_steps, n_series = data_chunk.shape x = np.arange(time_steps, dtype=np.float64) # Initialize result arrays results = { "trend": np.full(n_series, np.nan), "h": np.zeros(n_series, dtype=bool), "p": np.full(n_series, np.nan), "z": np.full(n_series, np.nan), "tau": np.full(n_series, np.nan), "std_error": np.full(n_series, np.nan), } # Find valid time series (with enough non-NaN values) valid_counts = np.sum(np.isfinite(data_chunk), axis=0) valid_series = valid_counts >= 3 if not np.any(valid_series): return results # Get valid indices for efficient assignment valid_indices = np.where(valid_series)[0] # Separate series with complete data from those with missing values no_nan_mask = valid_counts[valid_series] == time_steps no_nan_indices = valid_indices[no_nan_mask] # Process series without NaN using full vectorization. # NumPy returns a scalar directly; Dask returns a deferred scalar. n_clean_series = no_nan_mask.sum() if hasattr(n_clean_series, "compute"): n_clean_series = n_clean_series.compute() if n_clean_series > 0: if hasattr(data_chunk, "compute"): clean_data_computed = data_chunk[:, no_nan_indices].compute() else: clean_data_computed = np.asarray(data_chunk[:, no_nan_indices]) clean_data_computed = np.asarray(clean_data_computed, dtype=np.float64) if test_name in {"seasonal", "correlated_seasonal"}: core_input = _as_core_input_2d(clean_data_computed) if method == "theilslopes": slopes = _grouped_sen_slope_batch(core_input, effective_period) intercepts = ( np.median(clean_data_computed, axis=0) - (0.5 * (clean_data_computed.shape[0] - 1) / effective_period) * slopes ) else: slopes = _linregress_slope_batch(clean_data_computed, x) x_mean = np.mean(x) y_mean = np.mean(clean_data_computed, axis=0) intercepts = y_mean - slopes * x_mean if test_name == "seasonal": S_values, var_s_values, seasonal_denom = _seasonal_score_variance_batch( core_input, effective_period ) else: S_values, var_s_values, seasonal_denom = ( _grouped_correlated_stats_batch(core_input, effective_period) ) stat_n = time_steps elif test_name == "pre_whitening": core_input = _as_core_input_2d(clean_data_computed) if method == "theilslopes": slopes = _sen_slope_batch(core_input) else: slopes = _linregress_slope_batch(clean_data_computed, x) stat_n = core_input.shape[0] - 1 S_values, var_s_values = _pre_whiten_score_variance_batch(core_input) elif test_name == "trend_free_pre_whitening": core_input = _as_core_input_2d(clean_data_computed) if method == "theilslopes": slopes = _sen_slope_batch(core_input) else: slopes = _linregress_slope_batch(clean_data_computed, x) stat_n = core_input.shape[0] - 1 S_values, var_s_values = _trend_free_pre_whiten_score_variance_batch( core_input, slopes ) elif test_name == "hamed_rao": if method == "theilslopes": S_values, base_var_values, slopes = _score_variance_slope_batch( clean_data_computed, modified=False ) correction_slopes = slopes else: S_values, base_var_values = _score_variance_batch( clean_data_computed, modified=False ) slopes = _linregress_slope_batch(clean_data_computed, x) correction_slopes = _sen_slope_batch(clean_data_computed) var_s_values = _hamed_rao_variance_batch( clean_data_computed, base_var_values, correction_slopes, alpha=alpha, lag=lag, ) stat_n = time_steps elif test_name == "yue_wang": if lag is None and method == "theilslopes": S_values, var_s_values, slopes = _score_variance_slope_batch( clean_data_computed, modified=True ) elif lag is None: S_values, var_s_values = _score_variance_batch( clean_data_computed, modified=True ) slopes = _linregress_slope_batch(clean_data_computed, x) else: if method == "theilslopes": S_values, base_var_values, slopes = _score_variance_slope_batch( clean_data_computed, modified=False ) correction_slopes = slopes else: S_values, base_var_values = _score_variance_batch( clean_data_computed, modified=False ) slopes = _linregress_slope_batch(clean_data_computed, x) correction_slopes = _sen_slope_batch(clean_data_computed) var_s_values = _yue_wang_variance_batch( clean_data_computed, base_var_values, correction_slopes, lag=lag, ) stat_n = time_steps elif method == "theilslopes": S_values, var_s_values, slopes = _score_variance_slope_batch( clean_data_computed, modified=False ) stat_n = time_steps else: S_values, var_s_values = _score_variance_batch( clean_data_computed, modified=False ) stat_n = time_steps # Vectorized z-score calculation z_values = np.zeros_like(S_values, dtype=np.float64) if test_name == "correlated_seasonal": with np.errstate(divide="ignore", invalid="ignore"): z_values = S_values / np.sqrt(var_s_values) else: positive_mask = S_values > 0 negative_mask = S_values < 0 z_values[positive_mask] = (S_values[positive_mask] - 1) / np.sqrt( var_s_values[positive_mask] ) z_values[negative_mask] = (S_values[negative_mask] + 1) / np.sqrt( var_s_values[negative_mask] ) p_values = 2 * (1 - stats.norm.cdf(np.abs(z_values))) h_values = np.abs(z_values) > stats.norm.ppf(1 - alpha / 2) if ( test_name not in { "pre_whitening", "trend_free_pre_whitening", "hamed_rao", "yue_wang", } and method != "theilslopes" ): slopes = _linregress_slope_batch(clean_data_computed, x) if test_name in {"seasonal", "correlated_seasonal"}: tau_values = ( S_values / seasonal_denom if seasonal_denom != 0.0 else np.full_like(S_values, np.nan) ) else: tau_values = S_values / (0.5 * stat_n * (stat_n - 1)) if test_name in {"seasonal", "correlated_seasonal"} and method == "theilslopes": seasonal_x = x[:, np.newaxis] / effective_period fitted = seasonal_x * slopes[np.newaxis, :] + intercepts[np.newaxis, :] std_errors = np.std(clean_data_computed - fitted, axis=0) / np.sqrt( clean_data_computed.shape[0] ) elif method == "theilslopes": std_errors = _theil_std_error_batch(clean_data_computed, x, slopes) else: std_errors = _linregress_std_error_batch(clean_data_computed, x, slopes) results["trend"][no_nan_indices] = slopes results["h"][no_nan_indices] = h_values results["p"][no_nan_indices] = p_values results["z"][no_nan_indices] = z_values results["tau"][no_nan_indices] = tau_values results["std_error"][no_nan_indices] = std_errors # Handle series with NaN individually (but batch them for efficiency) nan_mask = valid_counts[valid_indices] != time_steps nan_indices = valid_indices[nan_mask] # Convert to regular numpy if it's a Dask array to avoid indexing issues if hasattr(data_chunk, "compute"): data_for_nan = data_chunk.compute() # Also convert indices to regular numpy for iteration if hasattr(nan_indices, "compute"): nan_indices = nan_indices.compute() else: data_for_nan = data_chunk for series_idx in nan_indices: y = data_for_nan[:, series_idx] finite_mask = np.isfinite(y) y_clean = y[finite_mask] # Mann-Kendall test for individual series with NaN try: mk_result = mann_kendall_test( y if test_name == "seasonal" else y_clean, alpha=alpha, method=method, test=test_name, period=effective_period, lag=lag, ) # Store results for key, value in mk_result.items(): results[key][series_idx] = value except Exception: # Skip problematic series - results remain NaN continue return results
[docs] def mann_kendall_xarray( data, # xr.DataArray dim: str = "time", alpha: float = 0.05, method: str = "theilslopes", use_dask: bool = True, test: str = "original", period: Optional[int] = None, lag: Optional[int] = None, group_dim: Optional[str] = None, ): # -> xr.Dataset """ Mann-Kendall test for xarray DataArray with intelligent dimension handling. Automatically handles: - 1D time series (pure time dimension) - Multi-dimensional data with preserved dimension order - Scalar dimensions (size=1) from sel() operations - Correct dimension ordering in output Parameters ---------- data : xr.DataArray Input data array dim : str, default 'time' Time dimension name to analyze along alpha : float, default 0.05 Significance level method : str, default 'theilslopes' Slope calculation method use_dask : bool, default True Use dask for computation if available test : str, default 'original' Mann-Kendall test family to apply: - 'original': original Mann-Kendall test - 'yue_wang': Yue and Wang (2004) modified variance correction - 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test - 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test - 'correlated_multivariate': Libiseller-Grimvall correlated multivariate MK test - 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test - 'regional': Helsel regional MK test - 'hamed_rao': Hamed and Rao (1998) variance correction - 'pre_whitening': Yue and Wang (2002) pre-whitening modification - 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening period : int, optional Seasonal cycle length used by ``test="seasonal"`` and ``test="correlated_seasonal"``. When omitted, these seasonal test families default to ``12``. Other test families ignore this argument. lag : int, optional Number of autocorrelation lags to include for ``test="yue_wang"`` and ``test="hamed_rao"``. ``None`` uses all available lags. This argument has no effect for other test families. group_dim : str, optional Grouping dimension used by ``test="multivariate"``, ``test="regional"``, and ``test="correlated_multivariate"``. If omitted, it is inferred only when there is exactly one non-time dimension. Returns ------- result : xr.Dataset Dataset containing trend analysis results with preserved dimension order Examples -------- >>> # 3D data (time, lat, lon) -> (lat, lon) >>> ds = mann_kendall_xarray(data_3d, dim='time') >>> >>> # 3D data (year, ensemble, lat) -> (ensemble, lat) >>> ds = mann_kendall_xarray(data_3d, dim='year') >>> >>> # Handle scalar dimension from sel >>> subset = data.sel(lat=0) # lat becomes scalar >>> ds = mann_kendall_xarray(subset, dim='time') # Works correctly """ # Get time axis time_axis = data.get_axis_num(dim) test_name = _normalize_test_name(test) effective_period = _resolve_seasonal_period(test_name, period) group_axis = _resolve_group_axis( data, group_axis=group_dim, time_axis=time_axis, test_name=test_name ) if use_dask and hasattr(data.data, "chunks") and group_axis is None: # Use dask for computation results = _dask_mann_kendall( data, dim, alpha, method, test, effective_period, lag ) else: # Use numpy implementation results = mann_kendall_multidim( data.values, axis=time_axis, alpha=alpha, method=method, test=test, period=effective_period, lag=lag, group_axis=group_axis, ) # Smart dimension handling: preserve order and filter scalar dimensions # Get all dimensions except the analyzed dimension, in original order output_dims = [] output_coords = {} for d in data.dims: if d == dim: # Skip the analyzed dimension continue if group_dim is not None and d == group_dim: continue if group_axis is not None and d == data.dims[group_axis]: continue # Check if this is a true dimension (size > 1) or scalar (size == 1) if data.sizes[d] > 1: output_dims.append(d) output_coords[d] = data.coords[d] # Scalar dimensions (size == 1) are excluded from output # Handle different cases based on output dimensions if not output_dims: # Pure scalar output: no non-time dimensions or all are size=1 # Return scalar dataset ds = xr.Dataset( data_vars={ "trend": ( [], ( results["trend"] if np.isscalar(results["trend"]) else results["trend"].item() ), ), "h": ( [], results["h"] if np.isscalar(results["h"]) else results["h"].item(), ), "p": ( [], results["p"] if np.isscalar(results["p"]) else results["p"].item(), ), "z": ( [], results["z"] if np.isscalar(results["z"]) else results["z"].item(), ), "tau": ( [], ( results["tau"] if np.isscalar(results["tau"]) else results["tau"].item() ), ), "std_error": ( [], ( results["std_error"] if np.isscalar(results["std_error"]) else results["std_error"].item() ), ), }, coords={}, attrs={ "title": "Mann-Kendall Trend Analysis", "alpha": alpha, "method": method, "test": test_name, "input_dims": str(data.dims), "analyzed_dim": dim, "group_dim": data.dims[group_axis] if group_axis is not None else "", }, ) else: # Multi-dimensional output: preserve original dimension order ds = xr.Dataset( data_vars={ "trend": (output_dims, results["trend"]), "h": (output_dims, results["h"]), "p": (output_dims, results["p"]), "z": (output_dims, results["z"]), "tau": (output_dims, results["tau"]), "std_error": (output_dims, results["std_error"]), }, coords=output_coords, attrs={ "title": "Mann-Kendall Trend Analysis", "alpha": alpha, "method": method, "test": test_name, "input_dims": str(data.dims), "analyzed_dim": dim, "group_dim": data.dims[group_axis] if group_axis is not None else "", }, ) return ds
def _dask_mann_kendall( data, dim: str, alpha: float, method: str, test: str = "original", # xr.DataArray period: Optional[int] = None, lag: Optional[int] = None, ): # -> Dict[str, np.ndarray] """Use dask map_blocks for Mann-Kendall computation.""" if _normalize_test_name(test) in _GROUPED_TESTS: raise ValueError( "Grouped Mann-Kendall tests are currently dispatched through the NumPy " "path. Use mann_kendall_xarray(..., group_dim=...) instead." ) # Get time axis time_axis = data.get_axis_num(dim) # Convert to dask array if not already if hasattr(data, "data") and isinstance(data.data, da.Array): dask_data = data.data else: # Create dask array with reasonable chunking chunks = list(data.shape) chunks[time_axis] = -1 # Keep time dimension unchunked # Chunk spatial dimensions reasonably for i, size in enumerate(chunks): if i != time_axis and size > 100: chunks[i] = min(50, size // 2) dask_data = da.from_array(data.values, chunks=chunks) def mk_block_processor(block): """Process a block of data with Mann-Kendall analysis.""" # Move time axis to front block = np.moveaxis(block, time_axis, 0) time_steps = block.shape[0] # Get spatial shape and flatten spatial_shape = block.shape[1:] spatial_size = np.prod(spatial_shape) block_2d = block.reshape(time_steps, spatial_size) # Initialize output arrays output_shape = (6,) + spatial_shape # 6 output variables output = np.full(output_shape, np.nan) output_2d = output.reshape(6, spatial_size) # Process each spatial point for i in range(spatial_size): series = block_2d[:, i] # Skip if insufficient data if len(series) < 3 or np.sum(np.isfinite(series)) < 3: continue try: # Perform Mann-Kendall test result = mann_kendall_test( series, alpha=alpha, method=method, test=test, period=period, lag=lag, ) # Store results: [trend, h, p, z, tau, std_error] output_2d[0, i] = result["trend"] output_2d[1, i] = float(result["h"]) output_2d[2, i] = result["p"] output_2d[3, i] = result["z"] output_2d[4, i] = result["tau"] output_2d[5, i] = result["std_error"] except Exception: continue # Skip problematic series return output.reshape(output_shape) # Determine output chunks output_chunks = list(dask_data.chunks) output_chunks[time_axis] = (6,) # Replace time with 6 output variables # Apply Mann-Kendall analysis using map_blocks with ProgressBar(): result_array = da.map_blocks( mk_block_processor, dask_data, dtype=float, chunks=output_chunks, drop_axis=time_axis, new_axis=time_axis, ).compute() # Move result axis to front and extract variables result_array = np.moveaxis(result_array, time_axis, 0) # Create result dictionary result_dict = { "trend": result_array[0], "h": result_array[1].astype(bool), "p": result_array[2], "z": result_array[3], "tau": result_array[4], "std_error": result_array[5], } return result_dict # Convenience function for easy access
[docs] def trend_analysis( data: Any, axis: Union[int, str] = 0, alpha: float = 0.05, method: str = "theilslopes", dim: Optional[Union[int, str]] = None, test: str = "original", period: Optional[int] = None, lag: Optional[int] = None, group_axis: Optional[Union[int, str]] = None, group_dim: Optional[str] = None, **kwargs, ) -> Any: """ Unified interface for Mann-Kendall trend analysis. Automatically chooses the best implementation based on input type. Parameters ---------- data : np.ndarray or xr.DataArray Input data array axis : int or str, default 0 Axis along which to compute trends. Can be integer index or dimension name. alpha : float, default 0.05 Significance level method : str, default 'theilslopes' Slope calculation method dim : int or str, optional Alternative name for axis parameter (XArray style). Takes precedence over axis. test : str, default 'original' Mann-Kendall test family to apply: - 'original': original Mann-Kendall test - 'yue_wang': Yue and Wang (2004) modified variance correction - 'seasonal': Hirsch-Slack (1984) seasonal Mann-Kendall test - 'correlated_seasonal': Hipel (1994) correlated seasonal Mann-Kendall test - 'multivariate': Hirsch-Slack / Helsel grouped multivariate MK test - 'regional': Helsel regional MK test - 'hamed_rao': Hamed and Rao (1998) variance correction - 'pre_whitening': Yue and Wang (2002) pre-whitening modification - 'trend_free_pre_whitening': Yue and Wang (2002) trend-free pre-whitening period : int, optional Seasonal cycle length used by ``test="seasonal"`` and ``test="correlated_seasonal"``. When omitted, these seasonal test families default to ``12``. Other test families ignore this argument. lag : int, optional Number of autocorrelation lags to include for ``test="yue_wang"`` and ``test="hamed_rao"``. ``None`` uses all available lags. This argument has no effect for other test families. group_axis : int or str, optional Grouping axis used by ``test="multivariate"`` and ``test="regional"`` for NumPy inputs. group_dim : str, optional Grouping dimension used by ``test="multivariate"`` and ``test="regional"`` for xarray inputs. **kwargs Additional arguments passed to underlying functions Notes ----- Parameter priority: dim > axis Both axis and dim support integer indices and string dimension names. """ # Handle parameter aliases with priority: dim > axis if dim is not None: actual_axis = dim else: actual_axis = axis if isinstance(data, xr.DataArray): return mann_kendall_xarray( data, dim=actual_axis, alpha=alpha, method=method, test=test, period=period, lag=lag, group_dim=group_dim, **kwargs, ) else: return mann_kendall_multidim( data, axis=actual_axis, alpha=alpha, method=method, test=test, period=period, lag=lag, group_axis=group_axis, **kwargs, )
# Aliases for backward compatibility and convenience mk_test = mann_kendall_test # Short alias mk_multidim = mann_kendall_multidim # Short alias for multidimensional