"""
Emergent Constraint Methods for Climate Data Analysis.
This module implements the emergent constraint method based on Cox et al. (2013).
It provides functions for calculating probability density functions and performing
constrained projections on climate model data.
References
----------
Cox, P. M., et al. (2013). Sensitivity of tropical carbon to climate change
constrained by carbon dioxide variability. Nature, 494(7437), 341-344.
Implementation adapted from:
https://github.com/blackcata/Emergent_Constraints/tree/master
Author: KM.Noh
Date: 2023.03.15
Modified for Skyborn package with type annotations and improved naming
"""
import warnings
from typing import Tuple, Union
import numpy as np
import xarray as xr
__all__ = [
"gaussian_pdf",
"emergent_constraint_posterior",
"emergent_constraint_prior",
"calc_GAUSSIAN_PDF",
"calc_PDF_EC",
"find_std_from_PDF",
"calc_PDF_EC_PRIOR",
]
def _preferred_float_dtype(*values) -> np.dtype:
"""Pick a floating work dtype from the provided inputs."""
preferred = None
for value in values:
raw = getattr(value, "values", value)
array = np.asarray(raw)
dtype = np.dtype(array.dtype)
if not np.issubdtype(dtype, np.floating):
continue
if dtype.itemsize < np.dtype(np.float32).itemsize:
dtype = np.dtype(np.float32)
if preferred is None or dtype.itemsize > preferred.itemsize:
preferred = dtype
return preferred if preferred is not None else np.dtype(np.float32)
[docs]
def gaussian_pdf(
mu: float, sigma: float, x: Union[np.ndarray, float]
) -> Union[np.ndarray, float]:
"""
Calculate Gaussian probability density function.
Parameters
----------
mu : float
Mean of the Gaussian distribution.
sigma : float
Standard deviation of the Gaussian distribution.
x : Union[np.ndarray, float]
Input values at which to evaluate the PDF.
Returns
-------
Union[np.ndarray, float]
Probability density function values.
References
----------
Adapted from: https://github.com/blackcata/Emergent_Constraints/tree/master
"""
work_dtype = _preferred_float_dtype(x)
x_values = np.asarray(x, dtype=work_dtype)
mu_value = work_dtype.type(mu)
sigma_value = work_dtype.type(sigma)
if sigma_value < 0:
warnings.warn(
"Standard deviation must be non-negative for gaussian_pdf.",
RuntimeWarning,
stacklevel=2,
)
pdf = np.full_like(x_values, np.nan, dtype=work_dtype)
elif sigma_value == 0:
warnings.warn(
"Standard deviation is zero in gaussian_pdf; returning a degenerate limit.",
RuntimeWarning,
stacklevel=2,
)
pdf = np.where(
x_values == mu_value,
work_dtype.type(np.inf),
work_dtype.type(0.0),
)
else:
scale = work_dtype.type(1.0) / (
np.sqrt(work_dtype.type(2.0 * np.pi)) * sigma_value
)
exponent = work_dtype.type(-0.5) * ((x_values - mu_value) / sigma_value) ** 2
pdf = scale * np.exp(exponent)
if np.ndim(x_values) == 0:
return float(pdf)
return pdf
def _fit_linear_relationship(
x_models: np.ndarray, y_models: np.ndarray, work_dtype: np.dtype
) -> Tuple[np.ndarray, np.ndarray, float, float, float, float]:
"""Fit the inter-model linear relationship used by the EC formulas."""
x_models = np.asarray(x_models, dtype=work_dtype)
y_models = np.asarray(y_models, dtype=work_dtype)
valid_mask = np.isfinite(x_models) & np.isfinite(y_models)
x_models = x_models[valid_mask]
y_models = y_models[valid_mask]
if x_models.size != y_models.size or x_models.size == 0:
raise ValueError(
"constraint_data and target_data must contain valid paired data."
)
x_mean = np.mean(x_models)
y_mean = np.mean(y_models)
x_centered = x_models - x_mean
y_centered = y_models - y_mean
ss_x = np.sum(x_centered**2)
if ss_x > work_dtype.type(0.0):
slope = np.sum(x_centered * y_centered) / ss_x
else:
slope = work_dtype.type(0.0)
intercept = y_mean - slope * x_mean
residuals = y_models - (intercept + slope * x_models)
if x_models.size > 2:
prediction_error = np.sqrt(
np.sum(residuals**2) / work_dtype.type(x_models.size - 2)
)
else:
prediction_error = np.sqrt(np.mean(residuals**2))
if not np.isfinite(prediction_error):
prediction_error = work_dtype.type(0.0)
return x_models, y_models, slope, intercept, prediction_error, ss_x
def _prediction_std(
prediction_error: float,
sample_size: int,
x_values: np.ndarray,
x_mean: float,
ss_x: float,
) -> np.ndarray:
"""Return predictive standard deviation for regression-conditioned values."""
work_dtype = np.dtype(np.asarray(x_values).dtype)
if prediction_error == 0.0:
return np.zeros_like(x_values, dtype=work_dtype)
base = np.ones_like(x_values, dtype=work_dtype) + work_dtype.type(1.0 / sample_size)
if ss_x > work_dtype.type(0.0):
base = base + ((x_values - x_mean) ** 2) / ss_x
return prediction_error * np.sqrt(base)
[docs]
def emergent_constraint_posterior(
constraint_data: xr.DataArray,
target_data: xr.DataArray,
constraint_grid: np.ndarray,
target_grid: np.ndarray,
obs_pdf: np.ndarray,
) -> Tuple[np.ndarray, float, float]:
"""
Calculate posterior PDF using emergent constraint method.
This function applies the emergent constraint method to reduce uncertainty
in climate projections by utilizing observational constraints.
Parameters
----------
constraint_data : xr.DataArray
Inter-model spread data for the constraint variable (e.g., model sensitivity).
target_data : xr.DataArray
Inter-model spread data for the target variable (e.g., future projection).
constraint_grid : np.ndarray
Grid values for the constraint variable.
target_grid : np.ndarray
Grid values for the target variable.
obs_pdf : np.ndarray
Observational PDF of the constraint variable.
Returns
-------
Tuple[np.ndarray, float, float]
A tuple containing:
- posterior_pdf : np.ndarray - Posterior PDF of the target variable
- posterior_std : float - Standard deviation of the target variable
- posterior_mean : float - Mean of the target variable
References
----------
Adapted from: https://github.com/blackcata/Emergent_Constraints/tree/master
Cox, P. M., et al. (2013). Nature, 494(7437), 341-344.
"""
work_dtype = _preferred_float_dtype(constraint_data, target_data)
constraint_grid = np.asarray(constraint_grid, dtype=work_dtype)
target_grid = np.asarray(target_grid, dtype=work_dtype)
obs_pdf = np.asarray(obs_pdf, dtype=work_dtype)
dx = constraint_grid[1] - constraint_grid[0]
x_models, y_models, slope, intercept, prediction_error, ss_x = (
_fit_linear_relationship(constraint_data.values, target_data.values, work_dtype)
)
n_models = len(x_models)
regression_line = intercept + slope * constraint_grid
sigma_prediction = _prediction_std(
prediction_error,
n_models,
constraint_grid,
np.mean(x_models),
ss_x,
)
posterior_pdf = np.zeros(len(target_grid), dtype=work_dtype)
nonzero_sigma = sigma_prediction > 0.0
if np.any(nonzero_sigma):
likelihood = np.zeros(
(len(target_grid), len(constraint_grid)), dtype=work_dtype
)
sigma_used = sigma_prediction[nonzero_sigma]
centered = (
target_grid[:, np.newaxis] - regression_line[nonzero_sigma][np.newaxis, :]
)
likelihood[:, nonzero_sigma] = np.exp(
work_dtype.type(-0.5) * (centered / sigma_used[np.newaxis, :]) ** 2
) / (np.sqrt(work_dtype.type(2.0 * np.pi)) * sigma_used[np.newaxis, :])
posterior_pdf = likelihood @ (obs_pdf * dx)
else:
nearest = np.argmin(
np.abs(target_grid[:, np.newaxis] - regression_line[np.newaxis, :]), axis=0
)
np.add.at(posterior_pdf, nearest, obs_pdf * dx)
# Calculate statistics
threshold = 0.341 # For 1-sigma equivalent
posterior_std = _calculate_std_from_pdf(threshold, target_grid, posterior_pdf)
posterior_mean = target_grid[posterior_pdf.argmax()]
return posterior_pdf, posterior_std, posterior_mean
def _calculate_std_from_pdf(
threshold: float, values: np.ndarray, pdf: np.ndarray
) -> float:
"""
Calculate standard deviation from probability density function.
Parameters
----------
threshold : float
Threshold value for probability integration (e.g., 0.341 for 1-sigma).
values : np.ndarray
Grid values corresponding to the PDF.
pdf : np.ndarray
Probability density function values.
Returns
-------
float
Standard deviation of the distribution.
References
----------
Adapted from: https://github.com/blackcata/Emergent_Constraints/tree/master
"""
pdf_sum = np.sum(pdf)
if pdf_sum <= 0 or not np.isfinite(pdf_sum):
return np.nan
max_index = int(pdf.argmax())
for i in range(max_index, -1, -1):
pdf_integral = pdf[i : max_index + 1].sum() / pdf_sum
if pdf_integral >= threshold:
return float(abs(values[max_index] - values[i]))
std_dev = np.sqrt(np.average((values - values[max_index]) ** 2, weights=pdf))
return float(std_dev)
[docs]
def emergent_constraint_prior(
constraint_data: xr.DataArray,
target_data: xr.DataArray,
constraint_grid: np.ndarray,
target_grid: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate prior probability distribution for emergent constraints.
Parameters
----------
constraint_data : xr.DataArray
Inter-model spread data for the constraint variable.
target_data : xr.DataArray
Inter-model spread data for the target variable.
constraint_grid : np.ndarray
Grid values for the constraint variable.
target_grid : np.ndarray
Grid values for the target variable.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
A tuple containing:
- prior_pdf : np.ndarray - Prior PDF
- prediction_error : np.ndarray - Prediction error array
- regression_line : np.ndarray - Linear regression values
References
----------
Adapted from: https://github.com/blackcata/Emergent_Constraints/tree/master
"""
work_dtype = _preferred_float_dtype(constraint_data, target_data)
constraint_grid = np.asarray(constraint_grid, dtype=work_dtype)
target_grid = np.asarray(target_grid, dtype=work_dtype)
x_models, y_models, slope, intercept, prediction_error_base, ss_x = (
_fit_linear_relationship(constraint_data.values, target_data.values, work_dtype)
)
n_models = len(x_models)
regression_line = intercept + slope * constraint_grid
prediction_error = _prediction_std(
prediction_error_base,
n_models,
constraint_grid,
np.mean(x_models),
ss_x,
)
prior_pdf = np.zeros((len(target_grid), len(constraint_grid)), dtype=work_dtype)
nonzero_sigma = prediction_error > 0.0
if np.any(nonzero_sigma):
centered = (
target_grid[:, np.newaxis] - regression_line[nonzero_sigma][np.newaxis, :]
)
sigma_used = prediction_error[nonzero_sigma]
prior_pdf[:, nonzero_sigma] = np.exp(
work_dtype.type(-0.5) * (centered / sigma_used[np.newaxis, :]) ** 2
) / (np.sqrt(work_dtype.type(2.0 * np.pi)) * sigma_used[np.newaxis, :])
if np.any(~nonzero_sigma):
nearest = np.argmin(
np.abs(
target_grid[:, np.newaxis]
- regression_line[~nonzero_sigma][np.newaxis, :]
),
axis=0,
)
prior_pdf[nearest, np.where(~nonzero_sigma)[0]] = 1.0
return prior_pdf, prediction_error, regression_line
# Legacy function names for backward compatibility
[docs]
def calc_GAUSSIAN_PDF(
mu: float, sigma: float, x: Union[np.ndarray, float]
) -> Union[np.ndarray, float]:
"""Legacy function name. Use gaussian_pdf() instead."""
return gaussian_pdf(mu, sigma, x)
[docs]
def calc_PDF_EC(
tmp_x: xr.DataArray,
tmp_y: xr.DataArray,
x: np.ndarray,
y: np.ndarray,
PDF_x: np.ndarray,
) -> Tuple[np.ndarray, float, float]:
"""Legacy function name. Use emergent_constraint_posterior() instead."""
return emergent_constraint_posterior(tmp_x, tmp_y, x, y, PDF_x)
[docs]
def find_std_from_PDF(thres: float, y: np.ndarray, PDF: np.ndarray) -> float:
"""Legacy function name. Use _calculate_std_from_pdf() instead."""
return _calculate_std_from_pdf(thres, y, PDF)
[docs]
def calc_PDF_EC_PRIOR(
tmp_x: xr.DataArray, tmp_y: xr.DataArray, x: np.ndarray, y: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Legacy function name. Use emergent_constraint_prior() instead."""
return emergent_constraint_prior(tmp_x, tmp_y, x, y)