Source code for skyborn.calc.mann_kendall.partial

"""Independent partial Mann-Kendall interfaces.

This module intentionally stays separate from ``core.py`` because partial
Mann-Kendall uses two input series: a response variable and a covariate.
That changes the missing-value semantics and the broadcast rules compared with
the single-series MK families implemented in the core module.
"""

import warnings
from typing import Any, Dict, Optional, Union

import numpy as np
import scipy.stats as stats
import xarray as xr

from .bindings import _partial_stats_batch, _partial_stats_sen_batch, _sen_slope_batch
from .regression import (
    _linregress_slope_batch,
    _linregress_std_error_batch,
    _theil_std_error_batch,
)

__all__ = [
    "partial_mann_kendall_test",
    "partial_mann_kendall_multidim",
    "partial_mann_kendall_xarray",
    "partial_test",
    "partial_multidim",
]


def _validate_partial_method(method: str) -> str:
    """Validate and normalize the response-slope method."""
    normalized = method.lower()
    if normalized not in {"theilslopes", "linregress"}:
        raise ValueError(f"Unknown method: {method}. Use 'theilslopes' or 'linregress'")
    return normalized


def _empty_partial_result(n_total: int, n_valid: int) -> Dict[str, Union[float, bool]]:
    """Return the partial MK NaN result payload for insufficient samples."""
    return {
        "trend": np.nan,
        "h": False,
        "p": np.nan,
        "z": np.nan,
        "tau": np.nan,
        "s": np.nan,
        "var_s": np.nan,
        "intercept": np.nan,
        "std_error": np.nan,
        "n": float(n_valid),
        "n_dropped": float(n_total - n_valid),
    }


def _partial_statistics_batch(
    response_2d: np.ndarray, covariate_2d: np.ndarray
) -> Dict[str, np.ndarray]:
    """Return partial-MK statistics for a clean dense 2D batch."""
    response = np.asarray(response_2d, dtype=np.float64)
    covariate = np.asarray(covariate_2d, dtype=np.float64)

    if response.ndim != 2 or covariate.ndim != 2:
        raise ValueError("Partial MK batch helpers expect two 2D arrays.")
    if response.shape != covariate.shape:
        raise ValueError("Response and covariate batches must have the same shape.")

    if response.shape[0] < 3:
        raise ValueError("Partial MK batch helpers need at least 3 time steps.")

    s_values, var_values, tau_values = _partial_stats_batch(response, covariate)

    return {
        "s": np.asarray(s_values, dtype=np.float64),
        "var_s": np.asarray(var_values, dtype=np.float64),
        "tau": np.asarray(tau_values, dtype=np.float64),
    }


def _theil_slope_intercept_scalar(
    response_1d: np.ndarray,
) -> tuple[float, float, np.ndarray, np.ndarray]:
    """Return Theil-Sen slope/intercept using the response's original time index."""
    response = np.asarray(response_1d, dtype=np.float64)
    valid = np.isfinite(response)
    x = np.arange(response.size, dtype=np.float64)[valid]
    y = response[valid]
    n = y.size

    if n < 2:
        return np.nan, np.nan, x, y

    slopes = np.empty(n * (n - 1) // 2, dtype=np.float64)
    offset = 0
    for start in range(n - 1):
        count = n - start - 1
        slopes[offset : offset + count] = (y[start + 1 :] - y[start]) / (
            x[start + 1 :] - x[start]
        )
        offset += count

    slope = float(np.nanmedian(slopes))
    intercept = float(np.nanmedian(y) - np.median(x) * slope)
    return slope, intercept, x, y


def _response_trend_scalar(
    response_1d: np.ndarray, method: str
) -> tuple[float, float, float]:
    """Return response-only slope, intercept, and std_error for one 1D series."""
    normalized_method = _validate_partial_method(method)
    response = np.asarray(response_1d, dtype=np.float64)
    valid = np.isfinite(response)
    x = np.arange(response.size, dtype=np.float64)[valid]
    y = response[valid]

    if y.size < 2:
        return np.nan, np.nan, np.nan

    if normalized_method == "theilslopes":
        slope, intercept, _, _ = _theil_slope_intercept_scalar(response)
        std_error = float(
            _theil_std_error_batch(
                y.reshape(-1, 1), x, np.asarray([slope], dtype=np.float64)
            )[0]
        )
        return slope, intercept, std_error

    slope, intercept = stats.linregress(x, y)[:2]
    std_error = float(
        _linregress_std_error_batch(
            y.reshape(-1, 1), x, np.asarray([slope], dtype=np.float64)
        )[0]
    )
    return float(slope), float(intercept), std_error


def _response_trend_batch(
    response_2d: np.ndarray, method: str
) -> Dict[str, np.ndarray]:
    """Return response-only slope, intercept, and std_error for a clean 2D batch."""
    normalized_method = _validate_partial_method(method)
    response = np.asarray(response_2d, dtype=np.float64)
    if response.ndim != 2:
        raise ValueError("Expected a 2D array for response trend batch estimation.")

    x = np.arange(response.shape[0], dtype=np.float64)

    if normalized_method == "theilslopes":
        slopes = _sen_slope_batch(response)
        intercepts = np.median(response, axis=0) - np.median(x) * slopes
        std_errors = _theil_std_error_batch(response, x, slopes)
    else:
        slopes = _linregress_slope_batch(response, x)
        intercepts = np.mean(response, axis=0) - slopes * np.mean(x)
        std_errors = _linregress_std_error_batch(response, x, slopes)

    return {
        "trend": np.asarray(slopes, dtype=np.float64),
        "intercept": np.asarray(intercepts, dtype=np.float64),
        "std_error": np.asarray(std_errors, dtype=np.float64),
    }


[docs] def partial_mann_kendall_test( data: np.ndarray, covariate: np.ndarray, alpha: float = 0.05, method: str = "theilslopes", ) -> Dict[str, Union[float, bool]]: """Perform the partial Mann-Kendall test for one response/covariate pair.""" normalized_method = _validate_partial_method(method) response = np.asarray(data, dtype=np.float64) cov = np.asarray(covariate, dtype=np.float64) if response.ndim != 1 or cov.ndim != 1: raise ValueError("partial_mann_kendall_test expects two 1D arrays.") if response.shape != cov.shape: raise ValueError("Response and covariate must have the same shape.") joint_valid = np.isfinite(response) & np.isfinite(cov) n_valid = int(np.sum(joint_valid)) if n_valid < 3: warnings.warn("Need at least 3 joint-valid data points for partial MK test") return _empty_partial_result(response.size, n_valid) stats_values = _partial_statistics_batch( response[joint_valid].reshape(-1, 1), cov[joint_valid].reshape(-1, 1), ) s_value = float(stats_values["s"][0]) var_s = float(stats_values["var_s"][0]) tau = float(stats_values["tau"][0]) with np.errstate(divide="ignore", invalid="ignore"): z = s_value / np.sqrt(var_s) p_value = 2.0 * (1.0 - stats.norm.cdf(abs(z))) h = bool(abs(z) > stats.norm.ppf(1.0 - alpha / 2.0)) if np.isfinite(z) else False slope, intercept, std_error = _response_trend_scalar(response, normalized_method) return { "trend": slope, "h": h, "p": float(p_value), "z": float(z), "tau": tau, "s": s_value, "var_s": var_s, "intercept": intercept, "std_error": std_error, "n": float(n_valid), "n_dropped": float(response.size - n_valid), }
def _broadcast_numpy_covariate( data: np.ndarray, covariate: np.ndarray, axis: int ) -> tuple[np.ndarray, np.ndarray]: """Broadcast NumPy covariate input onto the response array.""" response = np.asarray(data, dtype=np.float64) cov = np.asarray(covariate, dtype=np.float64) time_axis = int(axis) response_time_first = np.moveaxis(response, time_axis, 0) if cov.ndim == 1: if cov.shape[0] != response.shape[time_axis]: raise ValueError( "1D covariate must have the same length as the response time axis." ) cov_time_first = cov.reshape((cov.shape[0],) + (1,) * (response.ndim - 1)) elif cov.shape == response.shape: cov_time_first = np.moveaxis(cov, time_axis, 0) else: raise ValueError( "For NumPy inputs, covariate must be 1D with the same time length or " "have the same shape as data." ) return response_time_first, np.broadcast_to( cov_time_first, response_time_first.shape )
[docs] def partial_mann_kendall_multidim( data: np.ndarray, covariate: np.ndarray, axis: Union[int, str] = 0, alpha: float = 0.05, method: str = "theilslopes", chunk_size: Optional[int] = None, dim: Optional[Union[int, str]] = None, ) -> Dict[str, np.ndarray]: """Apply the partial Mann-Kendall test along one axis of a NumPy array.""" if dim is not None: axis = dim if isinstance(axis, str): raise ValueError("String axis names are only supported for xarray inputs.") normalized_method = _validate_partial_method(method) response_time_first, cov_time_first = _broadcast_numpy_covariate( np.asarray(data, dtype=np.float64), np.asarray(covariate, dtype=np.float64), int(axis), ) if response_time_first.ndim == 1: return partial_mann_kendall_test( response_time_first, cov_time_first, alpha=alpha, method=normalized_method, ) time_steps = response_time_first.shape[0] spatial_shape = response_time_first.shape[1:] n_points = int(np.prod(spatial_shape)) response_2d = response_time_first.reshape(time_steps, n_points) cov_2d = cov_time_first.reshape(time_steps, n_points) results_flat = { "trend": np.full(n_points, np.nan, dtype=np.float64), "h": np.zeros(n_points, dtype=bool), "p": np.full(n_points, np.nan, dtype=np.float64), "z": np.full(n_points, np.nan, dtype=np.float64), "tau": np.full(n_points, np.nan, dtype=np.float64), "s": np.full(n_points, np.nan, dtype=np.float64), "var_s": np.full(n_points, np.nan, dtype=np.float64), "intercept": np.full(n_points, np.nan, dtype=np.float64), "std_error": np.full(n_points, np.nan, dtype=np.float64), "n": np.zeros(n_points, dtype=np.float64), "n_dropped": np.full(n_points, float(time_steps), dtype=np.float64), } if chunk_size is None: max_memory_mb = 200 bytes_per_series = max(1, time_steps) * 8 * 6 chunk_size = max( 100, min(n_points, (max_memory_mb * 1024 * 1024) // bytes_per_series) ) for start_idx in range(0, n_points, chunk_size): end_idx = min(start_idx + chunk_size, n_points) response_chunk = response_2d[:, start_idx:end_idx] cov_chunk = cov_2d[:, start_idx:end_idx] joint_valid_counts = np.sum( np.isfinite(response_chunk) & np.isfinite(cov_chunk), axis=0 ) valid_series = joint_valid_counts >= 3 if not np.any(valid_series): continue local_indices = np.where(valid_series)[0] clean_local = local_indices[joint_valid_counts[local_indices] == time_steps] if clean_local.size > 0: response_clean = np.asarray( response_chunk[:, clean_local], dtype=np.float64 ) cov_clean = np.asarray(cov_chunk[:, clean_local], dtype=np.float64) if normalized_method == "theilslopes": s_values, var_values, tau_values, slopes = _partial_stats_sen_batch( response_clean, cov_clean ) partial_stats = { "s": s_values, "var_s": var_values, "tau": tau_values, } x = np.arange(response_clean.shape[0], dtype=np.float64) response_trend = { "trend": slopes, "intercept": np.median(response_clean, axis=0) - np.median(x) * slopes, "std_error": _theil_std_error_batch(response_clean, x, slopes), } else: partial_stats = _partial_statistics_batch(response_clean, cov_clean) response_trend = _response_trend_batch( response_clean, normalized_method ) with np.errstate(divide="ignore", invalid="ignore"): z_values = partial_stats["s"] / np.sqrt(partial_stats["var_s"]) p_values = 2.0 * (1.0 - stats.norm.cdf(np.abs(z_values))) h_values = np.abs(z_values) > stats.norm.ppf(1.0 - alpha / 2.0) global_clean = start_idx + clean_local results_flat["trend"][global_clean] = response_trend["trend"] results_flat["h"][global_clean] = h_values results_flat["p"][global_clean] = p_values results_flat["z"][global_clean] = z_values results_flat["tau"][global_clean] = partial_stats["tau"] results_flat["s"][global_clean] = partial_stats["s"] results_flat["var_s"][global_clean] = partial_stats["var_s"] results_flat["intercept"][global_clean] = response_trend["intercept"] results_flat["std_error"][global_clean] = response_trend["std_error"] results_flat["n"][global_clean] = float(time_steps) results_flat["n_dropped"][global_clean] = 0.0 fallback_local = local_indices[joint_valid_counts[local_indices] != time_steps] for offset in fallback_local: result = partial_mann_kendall_test( response_chunk[:, offset], cov_chunk[:, offset], alpha=alpha, method=normalized_method, ) global_idx = start_idx + int(offset) for key, value in result.items(): results_flat[key][global_idx] = value return { key: ( values.reshape(spatial_shape) if key != "h" else values.reshape(spatial_shape) ) for key, values in results_flat.items() }
[docs] def partial_mann_kendall_xarray( data: xr.DataArray, covariate: xr.DataArray, dim: str = "time", alpha: float = 0.05, method: str = "theilslopes", ) -> xr.Dataset: """Apply the partial Mann-Kendall test to xarray data.""" data_aligned, covariate_aligned = xr.align( data, covariate, join="exact", copy=False ) covariate_broadcast = covariate_aligned.broadcast_like(data_aligned) time_axis = data_aligned.get_axis_num(dim) results = partial_mann_kendall_multidim( data_aligned.values, covariate_broadcast.values, axis=time_axis, alpha=alpha, method=method, ) output_dims = [name for name in data_aligned.dims if name != dim] output_coords = {name: data_aligned.coords[name] for name in output_dims} if not output_dims: data_vars = {} for key, value in results.items(): scalar_value = value if np.isscalar(value) else np.asarray(value).item() data_vars[key] = ([], scalar_value) else: data_vars = { key: (output_dims, np.asarray(value)) for key, value in results.items() } return xr.Dataset( data_vars=data_vars, coords=output_coords, attrs={ "title": "Partial Mann-Kendall Trend Analysis", "alpha": alpha, "method": method, "analyzed_dim": dim, "covariate_dims": str(covariate_aligned.dims), }, )
partial_test = partial_mann_kendall_test partial_multidim = partial_mann_kendall_multidim