Source code for skyborn.calc.growth_rate.core

"""Chemke-style baroclinic and barotropic growth-rate diagnostics.

This module prepares one-dimensional atmospheric profiles in Python and
dispatches the expensive normal-mode solves to the compiled Fortran kernels in
``skyborn.calc.growth_rate.growth_rate_kernels``.

References
----------
Chemke, R., and Ming, Y. (2020):
    Large Atmospheric Waves Will Get Stronger, While Small Waves Will Get
    Weaker by the End of the 21st Century. Geophysical Research Letters, 47,
    e2020GL090441.
    https://doi.org/10.1029/2020GL090441

Chemke, R., Zanna, L., Orbe, C., Sentman, L. T., and Polvani, L. M. (2022):
    The Future Intensification of the North Atlantic Winter Storm Track: The
    Key Role of Dynamic Ocean Coupling. Journal of Climate, 35(8), 2407-2421.
    https://doi.org/10.1175/JCLI-D-21-0407.1

Chemke, R. (2022):
    The future poleward shift of Southern Hemisphere summer mid-latitude
    storm tracks stems from ocean coupling. Nature Communications, 13, 2531.
    https://doi.org/10.1038/s41467-022-29392-4

Chemke, R., Ming, Y., and Yuval, J. (2022):
    The intensification of winter mid-latitude storm tracks in the Southern
    Hemisphere. Nature Climate Change, 12, 553-557.
    https://doi.org/10.1038/s41558-022-01368-8
"""

from __future__ import annotations

import warnings
from typing import Optional, Tuple, Union

import numpy as np
import xarray as xr

from skyborn.interp.interpolation import interp_pressure_1d

from ..troposphere import trop_wmo as _trop_wmo
from ..troposphere import trop_wmo_profile as _trop_wmo_profile
from .growth_rate_kernels import dbaroc_growth_rate_1d as _dbaroc_growth_rate_1d
from .growth_rate_kernels import (
    dbaroc_growth_rate_profiles as _dbaroc_growth_rate_profiles,
)
from .growth_rate_kernels import dbarot_growth_rate_1d as _dbarot_growth_rate_1d

GAS_CONSTANT_DRY = 287.04
HEAT_CAPACITY = 1004.7
KAPPA = GAS_CONSTANT_DRY / HEAT_CAPACITY
REFERENCE_PRESSURE_PA = 100000.0
DEFAULT_SOLVER_LEVELS = 45
DEFAULT_SMOOTH_WINDOW = 1
DEFAULT_WAVENUMBER_MODE = "high"
DEFAULT_HIGH_RES_WAVENUMBER_COUNT = 200
DEFAULT_HIGH_RES_WAVENUMBER_STEP = 1.0e-7
DEFAULT_LOW_RES_LONGITUDE = np.arange(0.0, 361.0, 1.5, dtype=np.float64)

ProfileInput = Union[xr.DataArray, np.ndarray]
LatitudeInput = Union[ProfileInput, float, int, np.floating, np.integer]
PressureInput = Union[ProfileInput, float, int, np.floating, np.integer]
GrowthRateOutput = Union[float, np.ndarray, xr.DataArray]

__all__ = [
    "baroc_growth_rate",
    "barot_growth_rate",
]


def _as_1d_float64(values: ProfileInput, name: str) -> np.ndarray:
    """Convert one-dimensional eager inputs to ``float64`` NumPy arrays."""

    array = np.asarray(getattr(values, "data", values), dtype=np.float64)
    if array.ndim != 1:
        raise ValueError(f"`{name}` must be one-dimensional")
    return np.array(array, dtype=np.float64, copy=True)


def _coerce_profile_matrix(
    values: ProfileInput,
    name: str,
    pressure_size: int,
    pressure_dim: Optional[str],
) -> np.ndarray:
    """Normalize profile inputs to a ``(nprofile, nlev)`` float64 matrix."""

    array = np.asarray(getattr(values, "data", values), dtype=np.float64)
    if array.ndim not in {1, 2}:
        raise ValueError(f"`{name}` must be one- or two-dimensional")
    array = np.array(array, dtype=np.float64, copy=True)
    if array.ndim == 1:
        if array.shape[0] != pressure_size:
            raise ValueError(f"`{name}` length must match the pressure coordinate")
        return array[np.newaxis, :]

    if isinstance(values, xr.DataArray) and pressure_dim in values.dims:
        vertical_axis = values.dims.index(pressure_dim)  # type: ignore[arg-type]
    else:
        candidate_axes = [
            axis for axis, size in enumerate(array.shape) if size == pressure_size
        ]
        if len(candidate_axes) != 1:
            raise ValueError(
                f"`{name}` must have exactly one axis matching the pressure length"
            )
        vertical_axis = candidate_axes[0]

    if array.shape[vertical_axis] != pressure_size:
        raise ValueError(f"`{name}` vertical axis must match the pressure coordinate")

    return np.array(np.moveaxis(array, vertical_axis, -1), dtype=np.float64, copy=True)


def _broadcast_profile_matrix(
    matrix: np.ndarray,
    nprofile: int,
    name: str,
) -> np.ndarray:
    """Broadcast a ``(1, nlev)`` climatology to ``nprofile`` profiles."""

    if matrix.shape[0] == nprofile:
        return np.array(matrix, dtype=np.float64, copy=True)
    if matrix.shape[0] == 1:
        return np.array(
            np.broadcast_to(matrix, (nprofile, matrix.shape[1])),
            dtype=np.float64,
            copy=True,
        )
    raise ValueError(
        f"`{name}` must either have one profile or match the requested profile count"
    )


def _as_profile_vector(
    values: PressureInput,
    name: str,
    nprofile: int,
) -> np.ndarray:
    """Normalize scalar-or-vector inputs to a length-``nprofile`` array."""

    array = np.asarray(getattr(values, "data", values), dtype=np.float64)
    if array.ndim == 0:
        return np.full(nprofile, float(array), dtype=np.float64)
    if array.ndim != 1:
        raise ValueError(f"`{name}` must be scalar or one-dimensional")
    if array.size == nprofile:
        return np.array(array, dtype=np.float64, copy=True)
    if array.size == 1:
        return np.full(nprofile, float(array[0]), dtype=np.float64)
    raise ValueError(f"`{name}` must have length 1 or match the profile count")


def _same_shape_or_raise(*named_arrays: tuple[str, np.ndarray]) -> None:
    """Validate that all named one-dimensional arrays have identical shapes."""

    names = [name for name, _ in named_arrays]
    arrays = [array for _, array in named_arrays]
    reference_shape = arrays[0].shape
    for name, array in zip(names[1:], arrays[1:]):
        if array.shape != reference_shape:
            raise ValueError(
                "dimension mismatch for growth-rate inputs: "
                + ", ".join(
                    f"{item_name}={item_array.shape}"
                    for item_name, item_array in zip(names, arrays)
                )
            )


def _require_monotonic(values: np.ndarray, name: str) -> None:
    """Require a strictly monotonic one-dimensional coordinate."""

    diffs = np.diff(values)
    if not (np.all(diffs > 0.0) or np.all(diffs < 0.0)):
        raise ValueError(f"`{name}` must be strictly monotonic")


def _f_beta_from_lat_bounds(lat_bounds: Tuple[float, float]) -> Tuple[float, float]:
    """Return Chemke-style band-mean Coriolis terms for a latitude range."""

    lat0, lat1 = float(lat_bounds[0]), float(lat_bounds[1])
    sin_avg = (np.sin(np.deg2rad(lat0)) + np.sin(np.deg2rad(lat1))) / 2.0
    cos_avg = (np.cos(np.deg2rad(lat0)) + np.cos(np.deg2rad(lat1))) / 2.0
    f_cor = 2.0 * 7.292e-5 * sin_avg
    beta = 2.0 * 7.292e-5 * cos_avg / 6_371_000.0
    return float(f_cor), float(beta)


def _prepare_wavenumber_inputs(
    wavenumber_mode: str,
    lon: Optional[ProfileInput],
) -> Tuple[int, int, Optional[float]]:
    """Return normalized wavenumber-mode inputs for the compiled backend."""

    normalized_mode = wavenumber_mode.strip().lower().replace("_", "").replace("-", "")
    if normalized_mode in {"highres", "highresolution"}:
        normalized_mode = "high"
    elif normalized_mode in {"lowres", "lowresolution"}:
        normalized_mode = "low"
    elif normalized_mode not in {"high", "low"}:
        raise ValueError("`wavenumber_mode` must be 'high' or 'low'")
    if normalized_mode == "high":
        return 1, DEFAULT_HIGH_RES_WAVENUMBER_COUNT, None

    lon_values = (
        np.array(DEFAULT_LOW_RES_LONGITUDE, dtype=np.float64, copy=True)
        if lon is None
        else _as_1d_float64(lon, "lon")
    )
    _require_monotonic(lon_values, "lon")
    lon_span_degrees = abs(float(lon_values[0]) - float(lon_values[-1]))
    if lon_span_degrees <= 0.0:
        raise ValueError("`lon` must span a nonzero zonal distance")

    wavenumber_count = int(round(lon_values.size / 3.0))
    if wavenumber_count < 2:
        raise ValueError(
            "`lon` must contain enough points to build at least two low-resolution "
            "zonal wavenumbers"
        )

    return 2, wavenumber_count, lon_span_degrees * np.pi / 180.0


def _reduce_latitude_band_inputs(
    u: ProfileInput,
    temperature: ProfileInput,
    lat: Optional[LatitudeInput],
    lat_bounds: Tuple[float, float],
    pressure_pa: np.ndarray,
    pressure_dim: Optional[str],
) -> Tuple[ProfileInput, ProfileInput, Optional[LatitudeInput]]:
    """Collapse explicit latitude axes using a cosine-weighted band mean."""

    lat_min, lat_max = sorted((float(lat_bounds[0]), float(lat_bounds[1])))
    latitude_dim = None

    for values in (u, temperature):
        if not isinstance(values, xr.DataArray):
            continue

        candidate_dims = [
            dim
            for dim in values.dims
            if dim != pressure_dim and dim.lower() in {"lat", "latitude"}
        ]
        if len(candidate_dims) > 1:
            raise ValueError(
                "Latitude-band growth-rate inputs must have at most one "
                "latitude dimension per DataArray"
            )
        if not candidate_dims:
            continue
        if latitude_dim is None:
            latitude_dim = candidate_dims[0]
            continue
        if candidate_dims[0] != latitude_dim:
            raise ValueError(
                "Latitude-band xarray inputs must share the same latitude " "dimension"
            )

    if latitude_dim is not None:
        if lat is not None:
            raise ValueError(
                "`lat` cannot be provided together with xarray latitude-band "
                "inputs; the latitude coordinate is taken from the DataArray "
                "dimensions"
            )

        reduced_values = []
        for name, values in (("u", u), ("temperature", temperature)):
            if not isinstance(values, xr.DataArray) or latitude_dim not in values.dims:
                reduced_values.append(values)
                continue

            latitude_coord = values[latitude_dim]
            latitude_slice = (
                slice(lat_min, lat_max)
                if float(latitude_coord[0]) <= float(latitude_coord[-1])
                else slice(lat_max, lat_min)
            )
            subset = values.sel({latitude_dim: latitude_slice})
            if subset.sizes[latitude_dim] == 0:
                raise ValueError(
                    f"`lat_bounds` does not select any latitude points for `{name}`"
                )

            subset = subset.where(np.isfinite(subset))

            weights = xr.DataArray(
                np.cos(np.deg2rad(np.asarray(subset[latitude_dim], dtype=np.float64))),
                coords={latitude_dim: subset[latitude_dim]},
                dims=(latitude_dim,),
            )
            reduced_values.append(subset.weighted(weights).mean(latitude_dim))

        return reduced_values[0], reduced_values[1], None

    if (
        max(
            np.asarray(getattr(u, "data", u)).ndim,
            np.asarray(getattr(temperature, "data", temperature)).ndim,
        )
        < 3
    ):
        return u, temperature, lat

    if lat is None:
        raise ValueError(
            "NumPy latitude-band inputs with an explicit latitude axis "
            "require `lat` to provide the one-dimensional latitude "
            "coordinate"
        )

    latitude_values = np.asarray(getattr(lat, "data", lat), dtype=np.float64)
    if latitude_values.ndim != 1:
        raise ValueError(
            "`lat` must be one-dimensional when used as the latitude "
            "coordinate for NumPy latitude-band inputs"
        )

    latitude_mask = (latitude_values >= lat_min) & (latitude_values <= lat_max)
    if not np.any(latitude_mask):
        raise ValueError("`lat_bounds` does not select any latitude points")

    latitude_weights = np.cos(np.deg2rad(latitude_values[latitude_mask])).astype(
        np.float64,
        copy=False,
    )
    reduced_arrays = []
    for name, values in (("u", u), ("temperature", temperature)):
        array = np.asarray(getattr(values, "data", values), dtype=np.float64)
        if array.ndim not in {2, 3}:
            raise ValueError(
                f"`{name}` must be two- or three-dimensional when using "
                "NumPy latitude-band inputs"
            )

        pressure_axes = [
            axis for axis, size in enumerate(array.shape) if size == pressure_pa.size
        ]
        if len(pressure_axes) != 1:
            raise ValueError(
                f"`{name}` must have exactly one axis matching the "
                "pressure coordinate"
            )

        pressure_axis = pressure_axes[0]
        latitude_axes = [
            axis
            for axis, size in enumerate(array.shape)
            if axis != pressure_axis and size == latitude_values.size
        ]
        if len(latitude_axes) != 1:
            raise ValueError(
                f"`{name}` must have exactly one latitude axis matching "
                "`lat` for NumPy latitude-band inputs"
            )

        subset = np.take(array, np.flatnonzero(latitude_mask), axis=latitude_axes[0])
        subset = np.moveaxis(subset, latitude_axes[0], -1)
        subset = np.where(np.isfinite(subset), subset, np.nan)
        weight_view = latitude_weights.reshape(
            (1,) * (subset.ndim - 1) + (latitude_weights.size,)
        )
        masked_weights = np.where(np.isnan(subset), np.nan, weight_view)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)
            numerator = np.nanmean(subset * weight_view, axis=-1)
            denominator = np.nanmean(masked_weights, axis=-1)
        reduced_arrays.append(
            np.divide(
                numerator,
                denominator,
                out=np.full(numerator.shape, np.nan, dtype=np.float64),
                where=np.isfinite(denominator) & (denominator != 0.0),
            )
        )

    return reduced_arrays[0], reduced_arrays[1], None


def _infer_tropopause_pressure_pa(
    temperature: np.ndarray,
    pressure_pa: np.ndarray,
) -> float:
    """Diagnose tropopause pressure from the input sounding via WMO criteria."""

    pressure_for_tropopause = np.array(pressure_pa, dtype=np.float64, copy=True)
    temperature_for_tropopause = np.array(temperature, dtype=np.float64, copy=True)
    if pressure_for_tropopause[0] > pressure_for_tropopause[-1]:
        pressure_for_tropopause = pressure_for_tropopause[::-1].copy()
        temperature_for_tropopause = temperature_for_tropopause[::-1].copy()

    result = _trop_wmo_profile(
        temperature_for_tropopause,
        pressure_for_tropopause,
        pressure_unit="Pa",
    )
    tropopause_pressure_hpa = float(result["pressure"])
    success = bool(result["success"])
    if (
        not success
        or not np.isfinite(tropopause_pressure_hpa)
        or tropopause_pressure_hpa <= 0.0
    ):
        raise RuntimeError(
            "Automatic WMO tropopause detection did not return a usable "
            "tropopause pressure for this profile."
        )

    return tropopause_pressure_hpa * 100.0


def _wrap_batch_baroc_output(
    values: np.ndarray,
    profile_coord: Optional[xr.DataArray],
) -> Union[np.ndarray, xr.DataArray]:
    """Wrap batched baroclinic growth rates back into NumPy or xarray output."""

    converted = np.array(values, dtype=np.float64, copy=True)
    if profile_coord is None:
        return converted
    return xr.DataArray(
        converted,
        dims=profile_coord.dims,
        coords={profile_coord.dims[0]: profile_coord},
        name="baroc_growth_rate",
        attrs={"units": "s^-1"},
    )


[docs] def barot_growth_rate( u_barotropic: ProfileInput, lat: ProfileInput, *, radius: float = 6_371_000.0, omega: float = 7.292e-5, ) -> float: r"""Compute the maximum barotropic normal-mode growth rate from ``U(lat)``. Parameters ---------- u_barotropic : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional vertically averaged zonal-wind profile in ``m s^-1``. lat : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional latitude coordinate in degrees north. Values must be strictly monotonic. radius : float, optional Earth radius in meters. Only the default value is currently supported by the compiled kernel. omega : float, optional Planetary rotation rate in ``s^-1``. Only the default value is currently supported by the compiled kernel. Returns ------- float Maximum barotropic growth rate in ``s^-1``. Missing or unusable profiles return ``NaN``. Notes ----- This diagnostic solves the finite-difference generalized eigenproblem .. math:: A(k)\psi = \lambda B(k)\psi for each zonal wavenumber :math:`k`, where the continuous barotropic operator is .. math:: \left[k\beta - k^3 U + kU\partial_{yy} - kU_{yy}\right]\psi = \lambda \left(\partial_{yy} - k^2\right)\psi, with .. math:: \beta = \frac{2\Omega\cos(\phi)}{a}. The returned value is :math:`\max_k \operatorname{Im}(\lambda_k)`. References ---------- Chemke, R., Ming, Y., and Yuval, J. (2022): The intensification of winter mid-latitude storm tracks in the Southern Hemisphere. Nature Climate Change, 12, 553-557. https://doi.org/10.1038/s41558-022-01368-8 """ if radius != 6_371_000.0 or omega != 7.292e-5: raise NotImplementedError( "Custom `radius` and `omega` values are not implemented yet for " "the compiled barotropic growth-rate kernel." ) u_values = _as_1d_float64(u_barotropic, "u_barotropic") lat_values = _as_1d_float64(lat, "lat") _same_shape_or_raise(("u_barotropic", u_values), ("lat", lat_values)) if np.any(~np.isfinite(u_values)) or np.any(~np.isfinite(lat_values)): return float("nan") _require_monotonic(lat_values, "lat") if lat_values[0] > lat_values[-1]: lat_values = lat_values[::-1].copy() u_values = u_values[::-1].copy() max_growth, ier = _dbarot_growth_rate_1d(lat_values, u_values) if ier != 0: raise RuntimeError( f"barot_growth_rate Fortran backend returned ier={ier} for the " "requested latitude profile." ) return float(max_growth)
[docs] def baroc_growth_rate( u: ProfileInput, temperature: ProfileInput, pressure: ProfileInput, *, lat: Optional[LatitudeInput] = None, lat_bounds: Optional[Tuple[float, float]] = None, lon: Optional[ProfileInput] = None, wavenumber_mode: str = DEFAULT_WAVENUMBER_MODE, method: str = "log", tropopause_pressure: Optional[PressureInput] = None, solver_levels: int = DEFAULT_SOLVER_LEVELS, smooth_window: int = DEFAULT_SMOOTH_WINDOW, ) -> GrowthRateOutput: r"""Compute the maximum baroclinic normal-mode growth rate. Parameters ---------- u : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional zonal-wind profile in ``m s^-1`` or a two-dimensional stack of profiles that shares one vertical axis with ``pressure``. temperature : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional temperature profile in Kelvin or a two-dimensional stack of profiles that shares one vertical axis with ``pressure``. pressure : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional pressure coordinate in Pa or hPa. Values must be strictly monotonic and strictly positive. lat : scalar, :class:`xarray.DataArray`, :class:`numpy.ndarray`, optional Latitude in degrees north. This may be a scalar or, for batched profile input, a one-dimensional vector matching the profile dimension. The compiled solver uses this latitude to build the local Coriolis parameter and meridional gradient of planetary vorticity, unless ``lat_bounds`` is provided. When ``lat_bounds`` is given and raw NumPy inputs still contain an explicit latitude axis, ``lat`` may instead be the one-dimensional latitude coordinate used to select and cosine-weight that latitude band before the remaining profile solve. lat_bounds : tuple of float, optional Two-element latitude range in degrees north. When provided, the baroclinic solver uses the Chemke-style band-mean definitions .. math:: \sin_{\mathrm{avg}} = \frac{\sin(\phi_1) + \sin(\phi_2)}{2}, \qquad \cos_{\mathrm{avg}} = \frac{\cos(\phi_1) + \cos(\phi_2)}{2}, together with .. math:: f = 2\Omega \sin_{\mathrm{avg}}, \qquad \beta = \frac{2\Omega\cos_{\mathrm{avg}}}{a}, to match the latitude-band formulation used in the reference Chemke scripts. If ``u`` and ``temperature`` still carry a latitude axis, this option also reduces that axis with cosine-latitude weighting before the vertical-profile solve. For xarray inputs, the latitude coordinate is taken from the ``lat`` or ``latitude`` dimension. For NumPy inputs with three dimensions, pass the one-dimensional latitude coordinate through ``lat=...`` so the requested latitude band can be selected explicitly. The recommended NumPy layout is ``(time, level, lat)``. More generally, the wrapper requires exactly one axis matching the pressure-coordinate length and one non-pressure axis matching the latitude-coordinate length. lon : :class:`xarray.DataArray`, :class:`numpy.ndarray`, optional One-dimensional longitude coordinate in degrees used only when ``wavenumber_mode="low"``. The coordinate must be strictly monotonic. If omitted in low-resolution mode, the wrapper uses the original Chemke Python-share default ``np.arange(0.0, 361.0, 1.5)``. wavenumber_mode : {"high", "low"}, optional Zonal-wavenumber grid used by the compiled instability solver. ``"high"`` uses the fixed high-resolution Chemke/MATLAB-style grid ``k = 0, 1\times10^{-7}, \dots, 1.99\times10^{-5}\,\mathrm{m}^{-1}``. ``"low"`` uses the lower-resolution Chemke Python-share formula .. math:: k = \frac{2\pi w}{\left|\lambda_0 - \lambda_1\right|\pi a \cos\phi / 180}, where :math:`w = 0, 1, \dots, \mathrm{round}(N_{\lambda}/3)-1`. When ``lon`` is omitted, the low-resolution mode uses the original Chemke Python-share longitude grid ``np.arange(0.0, 361.0, 1.5)``. Defaults to ``"high"``. method : {"log", "linear"}, optional Vertical interpolation method used to remap the input profiles onto the solver pressure grid. ``"log"`` means linear interpolation in log-pressure, not logarithmic interpolation of the field itself. Defaults to ``"log"``. The legacy alias ``"logp"`` is still accepted. tropopause_pressure : scalar, :class:`xarray.DataArray`, :class:`numpy.ndarray`, optional Explicit tropopause pressure in Pa or hPa. If provided, the automatic WMO tropopause diagnosis is skipped and the solver grid is built from this pressure to the lower-tropospheric bound. For batched profile input, this may be a scalar climatology or a one-dimensional vector matching the profile dimension. solver_levels : int, optional Number of pressure levels used when the solver grid is built automatically from the diagnosed or explicit tropopause pressure. Defaults to ``DEFAULT_SOLVER_LEVELS``. smooth_window : int, optional Optional centered running-mean window applied to the growth-rate spectrum over zonal wavenumber inside the compiled Fortran backend before the final maximum-growth diagnostic is taken. ``1`` disables smoothing. If given, this must be a positive odd integer. Returns ------- float, :class:`numpy.ndarray`, :class:`xarray.DataArray` Maximum baroclinic growth rate in ``s^-1``. One-dimensional input returns a scalar float. Batched profile input returns a one-dimensional NumPy array or DataArray over the non-pressure dimension. Missing or unusable profiles return ``NaN``. Notes ----- This function diagnoses the WMO tropopause pressure by default, or uses an explicit ``tropopause_pressure`` when provided, and then interpolates ``u`` and ``temperature`` to a fixed pressure grid between that tropopause and the lower troposphere. The thermodynamic profiles passed to the compiled kernel are .. math:: \theta = T\left(\frac{p_0}{p}\right)^{\kappa}, \qquad \rho = \frac{p}{R_d T}, \qquad N_z = -\frac{1}{\rho\theta}\frac{\partial \theta}{\partial p}. The compiled solver then forms the finite-difference generalized eigenproblem .. math:: A(k)\psi = \lambda B(k)\psi, where the vertical quasi-geostrophic operator is represented in pressure coordinates by .. math:: A(k) \sim k\beta - k^3 U + kUf^2 \partial_p \left(N_z^{-1}\partial_p\right) - kf^2 \left(\partial_p U\right) N_z^{-1}\partial_p, and .. math:: B(k) \sim f^2 \partial_p \left(N_z^{-1}\partial_p\right) - k^2, and the returned value is :math:`\max_k \operatorname{Im}(\lambda_k)`. If ``smooth_window`` is greater than 1, the compiled backend first applies a centered running mean along the discrete zonal-wavenumber spectrum and then evaluates the final Chemke-style turning-point maximum on that smoothed spectrum. When ``u`` or ``temperature`` contains multiple profiles, one-dimensional climatologies are broadcast across the profile dimension in Python, while the per-profile pressure interpolation and normal-mode solve loop run in the compiled Fortran backend. The default ``DEFAULT_SOLVER_LEVELS = 45`` is a practical resolution choice, not a formal optimum. Increasing ``solver_levels`` can improve vertical-grid convergence up to a point, but it also increases the per-profile eigenvalue cost and does not guarantee a better diagnostic once the solution is already converged on the chosen pressure interval. References ---------- Chemke, R., and Ming, Y. (2020): Large Atmospheric Waves Will Get Stronger, While Small Waves Will Get Weaker by the End of the 21st Century. Geophysical Research Letters, 47, e2020GL090441. https://doi.org/10.1029/2020GL090441 Chemke, R., Zanna, L., Orbe, C., Sentman, L. T., and Polvani, L. M. (2022): The Future Intensification of the North Atlantic Winter Storm Track: The Key Role of Dynamic Ocean Coupling. Journal of Climate, 35(8), 2407-2421. https://doi.org/10.1175/JCLI-D-21-0407.1 Chemke, R. (2022): The future poleward shift of Southern Hemisphere summer mid-latitude storm tracks stems from ocean coupling. Nature Communications, 13, 2531. https://doi.org/10.1038/s41467-022-29392-4 Chemke, R., Ming, Y., and Yuval, J. (2022): The intensification of winter mid-latitude storm tracks in the Southern Hemisphere. Nature Climate Change, 12, 553-557. https://doi.org/10.1038/s41558-022-01368-8 """ if lat_bounds is not None and len(lat_bounds) != 2: raise ValueError("`lat_bounds` must contain exactly two latitude values") pressure_pa = _as_1d_float64(pressure, "pressure") if np.any(pressure_pa <= 0.0): raise ValueError("pressure values must be strictly positive") if np.nanmax(pressure_pa) <= 2000.0: pressure_pa = pressure_pa * 100.0 _require_monotonic(pressure_pa, "pressure") bottom_pressure_pa = min(REFERENCE_PRESSURE_PA, float(np.max(pressure_pa))) if isinstance(solver_levels, bool) or not isinstance( solver_levels, (int, np.integer) ): raise TypeError("`solver_levels` must be an integer >= 2") solver_levels = int(solver_levels) if solver_levels < 2: raise ValueError("`solver_levels` must be at least 2") if isinstance(smooth_window, bool) or not isinstance( smooth_window, (int, np.integer) ): raise TypeError("`smooth_window` must be a positive odd integer") smooth_window = int(smooth_window) if smooth_window < 1: raise ValueError("`smooth_window` must be at least 1") if smooth_window % 2 == 0: raise ValueError("`smooth_window` must be an odd integer") interp_method = method.strip().lower().replace("_", "") if interp_method in {"logp", "log"}: interp_method = "log" elif interp_method in {"linear", "pressure"}: interp_method = "linear" else: raise ValueError( "`method` must be 'log' or 'linear' " "(the alias 'logp' is also accepted)" ) wavenumber_mode_id, wavenumber_count, lon_span_radians = _prepare_wavenumber_inputs( wavenumber_mode, lon ) pressure_dim = pressure.dims[0] if isinstance(pressure, xr.DataArray) else None if lat_bounds is not None: u, temperature, lat = _reduce_latitude_band_inputs( u, temperature, lat, lat_bounds, pressure_pa, pressure_dim, ) lat_bounds_cos_avg = ( np.cos(np.deg2rad(float(lat_bounds[0]))) + np.cos(np.deg2rad(float(lat_bounds[1]))) ) / 2.0 else: lat_bounds_cos_avg = None if lat is None and lat_bounds is None: raise ValueError("`lat` or `lat_bounds` is required for baroc_growth_rate") if lat is not None and lat_bounds is not None: raise ValueError("`lat` and `lat_bounds` cannot be provided together") u_matrix_raw = _coerce_profile_matrix(u, "u", pressure_pa.size, pressure_dim) temperature_matrix_raw = _coerce_profile_matrix( temperature, "temperature", pressure_pa.size, pressure_dim, ) if ( u_matrix_raw.shape[0] == 1 and temperature_matrix_raw.shape[0] == 1 and np.asarray(getattr(u, "data", u)).ndim == 1 and np.asarray(getattr(temperature, "data", temperature)).ndim == 1 ): u_values = u_matrix_raw[0] temperature_values = temperature_matrix_raw[0] _same_shape_or_raise( ("u", u_values), ("temperature", temperature_values), ("pressure", pressure_pa), ) if ( np.any(~np.isfinite(u_values)) or np.any(~np.isfinite(temperature_values)) or np.any(~np.isfinite(pressure_pa)) ): return float("nan") if np.any(temperature_values <= 0.0): raise ValueError("temperature values must be strictly positive") if tropopause_pressure is None: tropopause_pressure_pa = _infer_tropopause_pressure_pa( temperature_values, pressure_pa, ) else: tropopause_values = _as_profile_vector( tropopause_pressure, "tropopause_pressure", 1, ) tropopause_pressure_pa = float(tropopause_values[0]) if np.isfinite(tropopause_pressure_pa) and tropopause_pressure_pa <= 2000.0: tropopause_pressure_pa *= 100.0 if tropopause_pressure_pa >= bottom_pressure_pa: raise ValueError( "The diagnosed tropopause pressure must be lower than the lower-" "tropospheric pressure bound used by the solver grid." ) solver_pressure_pa = np.linspace( tropopause_pressure_pa, bottom_pressure_pa, solver_levels, dtype=np.float64, ) u_solver = np.asarray( interp_pressure_1d( u_values, pressure_pa, solver_pressure_pa, method=interp_method, extrapolate=False, missing_value=np.nan, ), dtype=np.float64, ) temperature_solver = np.asarray( interp_pressure_1d( temperature_values, pressure_pa, solver_pressure_pa, method=interp_method, extrapolate=False, missing_value=np.nan, ), dtype=np.float64, ) if np.any(~np.isfinite(u_solver)) or np.any(~np.isfinite(temperature_solver)): return float("nan") theta_solver = ( temperature_solver * (REFERENCE_PRESSURE_PA / solver_pressure_pa) ** KAPPA ) if lat_bounds is None: latitude_value = float(_as_profile_vector(lat, "lat", 1)[0]) latitude_radians = np.deg2rad(latitude_value) f_cor = float(2.0 * 7.292e-5 * np.sin(latitude_radians)) beta = float(2.0 * 7.292e-5 * np.cos(latitude_radians) / 6_371_000.0) zonal_length = 1.0 if wavenumber_mode_id == 2: zonal_length = float( lon_span_radians * 6_371_000.0 * np.cos(latitude_radians) ) else: f_cor, beta = _f_beta_from_lat_bounds(lat_bounds) zonal_length = 1.0 if wavenumber_mode_id == 2: zonal_length = float( lon_span_radians * 6_371_000.0 * lat_bounds_cos_avg ) if wavenumber_mode_id == 2 and ( not np.isfinite(zonal_length) or zonal_length <= 0.0 ): raise ValueError( "The low-resolution wavenumber grid requires a positive zonal " "length from `lon` and the selected latitude geometry." ) max_growth, ier = _dbaroc_growth_rate_1d( u_solver, theta_solver, solver_pressure_pa, temperature_solver, f_cor, beta, smooth_window, wavenumber_mode_id, wavenumber_count, zonal_length, ) if ier != 0: raise RuntimeError( f"baroc_growth_rate Fortran backend returned ier={ier} for the " "requested atmospheric column." ) return float(max_growth) nprofile = max(u_matrix_raw.shape[0], temperature_matrix_raw.shape[0]) profile_coord = None for values in (u, temperature): if ( not isinstance(values, xr.DataArray) or values.ndim != 2 or pressure_dim not in values.dims ): continue candidate_dim = ( values.dims[0] if values.dims[1] == pressure_dim else values.dims[1] ) candidate_coord = values[candidate_dim] if profile_coord is None: profile_coord = candidate_coord elif candidate_dim != profile_coord.dims[0]: raise ValueError( "Batched xarray inputs must share the same non-pressure dimension" ) u_matrix = _broadcast_profile_matrix( u_matrix_raw, nprofile, "u", ) temperature_matrix = _broadcast_profile_matrix( temperature_matrix_raw, nprofile, "temperature", ) output = np.full(nprofile, np.nan, dtype=np.float64) valid_profile_mask = ~( np.any(~np.isfinite(u_matrix), axis=1) | np.any(~np.isfinite(temperature_matrix), axis=1) ) if lat_bounds is None: lat_values = _as_profile_vector(lat, "lat", nprofile) valid_profile_mask &= np.isfinite(lat_values) f_values = np.full(nprofile, np.nan, dtype=np.float64) beta_values = np.full(nprofile, np.nan, dtype=np.float64) valid_lat_mask = valid_profile_mask & np.isfinite(lat_values) f_values[valid_lat_mask] = ( 2.0 * 7.292e-5 * np.sin(np.deg2rad(lat_values[valid_lat_mask])) ) beta_values[valid_lat_mask] = ( 2.0 * 7.292e-5 * np.cos(np.deg2rad(lat_values[valid_lat_mask])) / 6_371_000.0 ) else: f_cor, beta = _f_beta_from_lat_bounds(lat_bounds) f_values = np.full(nprofile, f_cor, dtype=np.float64) beta_values = np.full(nprofile, beta, dtype=np.float64) tropopause_pressure_pa = None if tropopause_pressure is not None: tropopause_values = _as_profile_vector( tropopause_pressure, "tropopause_pressure", nprofile, ) tropopause_missing_mask = ~np.isfinite(tropopause_values) tropopause_pressure_pa = np.array( tropopause_values, dtype=np.float64, copy=True ) finite_tropopause_values = tropopause_pressure_pa[ ~tropopause_missing_mask & np.isfinite(tropopause_pressure_pa) ] if ( finite_tropopause_values.size > 0 and np.nanmax(finite_tropopause_values) <= 2000.0 ): tropopause_pressure_pa[~tropopause_missing_mask] *= 100.0 valid_profile_mask &= ~tropopause_missing_mask finite_tropopause_mask = valid_profile_mask & np.isfinite( tropopause_pressure_pa ) if np.any(finite_tropopause_mask & (tropopause_pressure_pa <= 0.0)): raise ValueError("tropopause pressure values must be strictly positive") if np.any( finite_tropopause_mask & (tropopause_pressure_pa >= bottom_pressure_pa) ): raise ValueError( "Each explicit tropopause pressure must be lower than the " "lower-tropospheric pressure bound used by the solver grid." ) if np.any(valid_profile_mask & np.any(temperature_matrix <= 0.0, axis=1)): raise ValueError("temperature values must be strictly positive") if tropopause_pressure_pa is None: tropopause_result = _trop_wmo( temperature_matrix[valid_profile_mask], pressure_pa, xdim=-1, ydim=0, levdim=1, timedim=None, pressure_unit="Pa", missing_value=-999.0, ) tropopause_success = np.asarray( tropopause_result["success"], dtype=bool, ) tropopause_pressure_pa_valid = ( np.asarray(tropopause_result["pressure"], dtype=np.float64) * 100.0 ) tropopause_pressure_pa = np.full(nprofile, np.nan, dtype=np.float64) valid_indices = np.flatnonzero(valid_profile_mask) tropopause_pressure_pa[valid_indices[tropopause_success]] = ( tropopause_pressure_pa_valid[tropopause_success] ) valid_profile_mask[valid_indices[~tropopause_success]] = False if not np.any(valid_profile_mask): return _wrap_batch_baroc_output( output, profile_coord, ) ramp = np.linspace(0.0, 1.0, solver_levels, dtype=np.float64) solver_pressure_matrix_pa = ( tropopause_pressure_pa[:, np.newaxis] + (bottom_pressure_pa - tropopause_pressure_pa)[:, np.newaxis] * ramp[np.newaxis, :] ) if not np.any(valid_profile_mask): return _wrap_batch_baroc_output( output, profile_coord, ) zonal_length_values = np.full(nprofile, 1.0, dtype=np.float64) if wavenumber_mode_id == 2: if lat_bounds is None: zonal_length_values[valid_profile_mask] = ( lon_span_radians * 6_371_000.0 * np.cos(np.deg2rad(lat_values[valid_profile_mask])) ) else: zonal_length_values[valid_profile_mask] = ( lon_span_radians * 6_371_000.0 * lat_bounds_cos_avg ) if np.any( valid_profile_mask & (~np.isfinite(zonal_length_values) | (zonal_length_values <= 0.0)) ): raise ValueError( "The low-resolution wavenumber grid requires a positive zonal " "length from `lon` and the selected latitude geometry." ) interp_kind = 1 if interp_method == "linear" else 2 growth_valid, ier_valid = _dbaroc_growth_rate_profiles( np.asfortranarray(u_matrix[valid_profile_mask].T), np.asfortranarray(temperature_matrix[valid_profile_mask].T), pressure_pa, np.asfortranarray(solver_pressure_matrix_pa[valid_profile_mask].T), np.asfortranarray(f_values[valid_profile_mask]), np.asfortranarray(beta_values[valid_profile_mask]), interp_kind, smooth_window, wavenumber_mode_id, wavenumber_count, np.asfortranarray(zonal_length_values[valid_profile_mask]), ) growth_valid = np.asarray(growth_valid, dtype=np.float64) ier_valid = np.asarray(ier_valid, dtype=np.int64) valid_indices = np.flatnonzero(valid_profile_mask) output[valid_indices[ier_valid == 0]] = growth_valid[ier_valid == 0] if np.any((ier_valid != 0) & (ier_valid != 100)): failed = valid_indices[(ier_valid != 0) & (ier_valid != 100)] failed_codes = ier_valid[(ier_valid != 0) & (ier_valid != 100)] raise RuntimeError( "baroc_growth_rate Fortran backend returned nonzero ier values for " f"profile indices {failed.tolist()} with ier={failed_codes.tolist()}." ) return _wrap_batch_baroc_output( output, profile_coord, )