Source code for skyborn.interp.interpolation

"""
Interpolation functions for hybrid-sigma and multidimensional data.

This module provides advanced interpolation capabilities for atmospheric science data,
including hybrid-sigma to pressure level interpolation and multidimensional spatial
interpolation with optional extrapolation.

References
----------
General interpolation formulations adapted for atmospheric datasets.
"""

from __future__ import annotations

import typing
import warnings

import numpy as np
import xarray as xr

from .fortran.int2p_kernels import dinterp_pressure_1d as _dinterp_pressure_1d
from .fortran.isentropic_kernels import (
    dinterp_to_isentropic_corder_into as _dinterp_to_isentropic_corder_into,
)
from .fortran.vinth2p_kernels import (
    ddelta_pressure_hybrid_pa as _ddelta_pressure_hybrid_pa,
)
from .fortran.vinth2p_kernels import (
    ddelta_pressure_hybrid_pa_into as _ddelta_pressure_hybrid_pa_into,
)
from .fortran.vinth2p_kernels import (
    dgeopotential_height_hybrid_corder_pa_into as _dgeopotential_height_hybrid_corder_pa_into,
)
from .fortran.vinth2p_kernels import (
    dpressure_at_hybrid_levels_pa as _dpressure_at_hybrid_levels_pa,
)
from .fortran.vinth2p_kernels import (
    dpressure_at_hybrid_levels_pa_into as _dpressure_at_hybrid_levels_pa_into,
)
from .fortran.vinth2p_kernels import dsigma2hybrid_nodes as _dsigma2hybrid_nodes
from .fortran.vinth2p_kernels import (
    dsigma2hybrid_nodes_corder_into as _dsigma2hybrid_nodes_corder_into,
)
from .fortran.vinth2p_kernels import (
    dsigma2hybrid_nodes_into as _dsigma2hybrid_nodes_into,
)
from .fortran.vinth2p_kernels import (
    dvinth2p_ecmwf_nodes_corder_pa_into as _dvinth2p_ecmwf_nodes_corder_pa_into,
)
from .fortran.vinth2p_kernels import dvinth2p_ecmwf_nodes_pa as _dvinth2p_ecmwf_nodes_pa
from .fortran.vinth2p_kernels import (
    dvinth2p_ecmwf_nodes_pa_into as _dvinth2p_ecmwf_nodes_pa_into,
)
from .fortran.vinth2p_kernels import (
    dvinth2p_nodes_corder_pa_into as _dvinth2p_nodes_corder_pa_into,
)
from .fortran.vinth2p_kernels import dvinth2p_nodes_pa as _dvinth2p_nodes_pa
from .fortran.vinth2p_kernels import dvinth2p_nodes_pa_into as _dvinth2p_nodes_pa_into

__all__ = [
    "interp_pressure_1d",
    "pressure_at_hybrid_levels",
    "delta_pressure_hybrid",
    "geopotential_height_at_hybrid_levels",
    "interp_hybrid_to_pressure",
    "interp_to_isentropic",
    "interp_sigma_to_hybrid",
    "interp_multidim",
]

supported_types = typing.Union[xr.DataArray, np.ndarray]
coordinate_types = typing.Union[xr.DataArray, np.ndarray, typing.Sequence[float]]
_PRESSURE_INTERP_SPVL = np.float64(np.finfo(np.float64).max)
_SPVL = np.float64(-9.96921e36)
_RD_OVER_CP = 0.2854

__pres_lev_mandatory__ = np.array(
    [
        1000,
        925,
        850,
        700,
        500,
        400,
        300,
        250,
        200,
        150,
        100,
        70,
        50,
        30,
        20,
        10,
        7,
        5,
        3,
        2,
        1,
    ]
).astype(
    np.float32
)  # Mandatory pressure levels (mb)
__pres_lev_mandatory__ = __pres_lev_mandatory__ * 100.0  # Convert mb to Pa


def _normalize_interp_method(method: str) -> str:
    """Normalize public interpolation method aliases to one canonical label.

    In particular, accept ``"loglog"`` from user-facing calls and normalize it
    to the internal ``"log-log"`` spelling used by the vinth2p kernels.
    """

    normalized = method.lower().replace("_", "-")
    if normalized == "loglog":
        return "log-log"
    return normalized


def _rename_colliding_coeff_dim(target, hya, hyb):
    """Avoid accidental xarray alignment when hybrid coeff dims collide."""

    if not (
        isinstance(target, xr.DataArray)
        and isinstance(hya, xr.DataArray)
        and isinstance(hyb, xr.DataArray)
    ):
        return hya, hyb

    coeff_dim = hya.dims[0]
    if coeff_dim in target.dims and target.sizes[coeff_dim] != hya.sizes[coeff_dim]:
        new_dim = "lev"
        if new_dim in target.dims:
            new_dim = "__hybrid_lev__"
        hya = hya.rename({coeff_dim: new_dim})
        hyb = hyb.rename({hyb.dims[0]: new_dim})

    return hya, hyb


def _with_dataarray_metadata(template, data, coords=None, dims=None):
    """Build a new DataArray while preserving the template metadata."""

    return xr.DataArray(
        data,
        dims=template.dims if dims is None else dims,
        coords=template.coords if coords is None else coords,
        name=template.name,
        attrs=template.attrs.copy(),
    )


def _dimension_coord_or_default(array, dim, *, output_dim=None, size=None):
    """Return a stable 1D coordinate for ``dim`` or a default integer index."""

    output_dim = dim if output_dim is None else output_dim
    coord = array.coords.get(dim)
    if coord is None:
        length = array.sizes[dim] if size is None else size
        return xr.DataArray(np.arange(length), dims=(output_dim,))

    if size is not None:
        coord = coord.isel({dim: slice(None, size)})

    return xr.DataArray(
        np.asarray(coord.data),
        dims=(output_dim,),
        attrs=coord.attrs.copy(),
    )


def _finalize_hybrid_level_output(
    result: xr.DataArray,
    *,
    lev_name: str,
    lev_coord: xr.DataArray,
    lev_dim: str | None,
    output_dims,
) -> xr.DataArray:
    """Apply optional public xarray output naming and ordering controls."""

    target_lev_dim = lev_name if lev_dim is None else lev_dim

    target_lev_coord = xr.DataArray(
        np.asarray(lev_coord.data),
        dims=(target_lev_dim,),
        attrs=lev_coord.attrs.copy(),
    )

    if target_lev_dim != lev_name:
        result = result.rename({lev_name: target_lev_dim})

    result = result.assign_coords({target_lev_dim: target_lev_coord})

    if output_dims is not None:
        requested_dims = tuple(output_dims)
        output_dims = tuple(dim for dim in requested_dims if dim in result.dims)
        output_dims += tuple(dim for dim in result.dims if dim not in output_dims)
        if len(output_dims) != result.ndim or set(output_dims) != set(result.dims):
            raise ValueError(
                "`output_dims` must contain each output dimension exactly once: "
                f"expected a permutation of {result.dims}, got {requested_dims}"
            )
        result = result.transpose(*output_dims)

    return result


def _target_level_dataarray(
    target_levels: coordinate_types,
    *,
    default_dim: str,
    units: str,
    long_name: str,
) -> xr.DataArray:
    """Return target levels as a one-dimensional DataArray."""

    if isinstance(target_levels, xr.DataArray):
        if target_levels.ndim != 1:
            raise ValueError("target levels must be one-dimensional")
        result = target_levels.copy()
        attrs = result.attrs.copy()
        attrs.setdefault("units", units)
        attrs.setdefault("long_name", long_name)
        return result.assign_attrs(attrs)

    values = np.asarray(target_levels, dtype=np.float64)
    if values.ndim != 1:
        raise ValueError("target levels must be one-dimensional")
    return xr.DataArray(
        values,
        dims=(default_dim,),
        coords={default_dim: values},
        attrs={"units": units, "long_name": long_name},
    )


def _interp_to_isentropic_fortran_corder(
    data: xr.DataArray,
    temperature: xr.DataArray,
    pressure: xr.DataArray,
    theta_levels: np.ndarray,
    *,
    lev_dim: str,
    p0: float,
    kappa: float,
    spvl: float,
    extrapolate: bool,
) -> xr.DataArray:
    """Run isentropic interpolation through the flat C-order Fortran kernel."""

    if _dinterp_to_isentropic_corder_into is None:
        raise RuntimeError("isentropic interpolation backend is not available")

    interp_axis = data.dims.index(lev_dim)
    shape_before = data.shape[:interp_axis]
    shape_after = data.shape[interp_axis + 1 :]
    nouter = int(np.prod(shape_before, dtype=np.int64)) if shape_before else 1
    ninner = int(np.prod(shape_after, dtype=np.int64)) if shape_after else 1
    nlev = data.shape[interp_axis]
    ntheta = theta_levels.size

    output_values = _compiled_float64_output(
        (*shape_before, ntheta, *shape_after), order="C"
    )
    _dinterp_to_isentropic_corder_into(
        _compiled_float64_flat_with_missing(data.data, spvl),
        _compiled_float64_flat_with_missing(temperature.data, spvl),
        _compiled_float64_flat_with_missing(pressure.data, spvl),
        _compiled_float64_vector(theta_levels),
        output_values.reshape(-1),
        float(p0),
        float(kappa),
        float(spvl),
        int(bool(extrapolate)),
        nouter,
        nlev,
        ninner,
    )
    output_values[output_values == spvl] = np.nan
    output_values = _restore_compiled_interp_output_dtype(output_values, data)

    dims = tuple(
        dim if idx != interp_axis else "theta" for idx, dim in enumerate(data.dims)
    )
    coords = {
        dim: coord
        for dim, coord in data.coords.items()
        if dim in dims and dim != lev_dim
    }
    return xr.DataArray(
        output_values,
        dims=dims,
        coords=coords,
        name=data.name,
        attrs=data.attrs.copy(),
    )


def _interp_to_isentropic(
    data: xr.DataArray,
    temperature: xr.DataArray,
    pressure: xr.DataArray,
    theta_levels: np.ndarray,
    *,
    lev_dim: str,
    p0: float,
    kappa: float,
    spvl: float,
    extrapolate: bool,
) -> xr.DataArray:
    """Interpolate to isentropic levels through the compiled Fortran backend."""

    return _interp_to_isentropic_fortran_corder(
        data,
        temperature,
        pressure,
        theta_levels,
        lev_dim=lev_dim,
        p0=p0,
        kappa=kappa,
        spvl=spvl,
        extrapolate=extrapolate,
    )


[docs] def interp_pressure_1d( data: supported_types | None = None, source_pressure: supported_types | None = None, target_pressure: supported_types | None = None, *, method: str = "log", extrapolate: bool = False, missing_value: object = np.nan, **legacy_kwargs: supported_types, ) -> supported_types: """Interpolate a one-dimensional profile between pressure coordinates. Parameters ---------- data : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional field values defined on ``source_pressure``. source_pressure : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional source pressure levels. Values must be strictly monotonic after missing levels are removed. target_pressure : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional target pressure levels. Values may be increasing or decreasing. method : {"linear", "log"}, optional Interpolate linearly in pressure or in log-pressure. Defaults to ``"log"``. extrapolate : bool, optional If True, use the end slopes to extrapolate outside the valid source pressure range. Defaults to False. missing_value : scalar, optional Public missing-value marker. Defaults to ``np.nan``. Returns ------- :class:`xarray.DataArray`, :class:`numpy.ndarray` Interpolated values on ``target_pressure``. Xarray inputs return a one-dimensional DataArray with the target pressure coordinate. Notes ----- The legacy keyword aliases ``values``, ``x``, ``p_in``, and ``p_out`` are still accepted for backward compatibility, but the preferred public parameter names are now ``data``, ``source_pressure``, and ``target_pressure``. """ legacy_name_map = { "values": "data", "x": "data", "p_in": "source_pressure", "p_out": "target_pressure", } for legacy_name, canonical_name in legacy_name_map.items(): if legacy_name not in legacy_kwargs: continue legacy_value = legacy_kwargs.pop(legacy_name) if canonical_name == "data": if data is not None: raise TypeError( "interp_pressure_1d() received both `data` and legacy " f"`{legacy_name}`" ) data = legacy_value elif canonical_name == "source_pressure": if source_pressure is not None: raise TypeError( "interp_pressure_1d() received both `source_pressure` and " f"legacy `{legacy_name}`" ) source_pressure = legacy_value else: if target_pressure is not None: raise TypeError( "interp_pressure_1d() received both `target_pressure` and " f"legacy `{legacy_name}`" ) target_pressure = legacy_value if legacy_kwargs: unexpected = ", ".join(f"`{name}`" for name in sorted(legacy_kwargs)) raise TypeError( "interp_pressure_1d() got unexpected keyword argument(s): " f"{unexpected}" ) missing_arguments = [] if data is None: missing_arguments.append("data") if source_pressure is None: missing_arguments.append("source_pressure") if target_pressure is None: missing_arguments.append("target_pressure") if missing_arguments: missing_text = ", ".join(f"`{name}`" for name in missing_arguments) raise TypeError( "interp_pressure_1d() missing required argument(s): " f"{missing_text}" ) _require_compiled_interp("interp_pressure_1d", _dinterp_pressure_1d) _reject_lazy_or_unit_backed_inputs( "interp_pressure_1d", data, source_pressure, target_pressure, ) def is_nan_missing_value(value) -> bool: if value is None: return True try: return bool(np.isnan(value)) except TypeError: return False def pressure_interp_missing_mask(values: np.ndarray) -> np.ndarray: mask = ~np.isfinite(values) if missing_value is not None and not is_nan_missing_value(missing_value): mask |= values == float(missing_value) return mask def require_strict_monotonic_pressure(name: str, values: np.ndarray) -> None: if values.size < 2: return diffs = np.diff(values) if not (np.all(diffs > 0.0) or np.all(diffs < 0.0)): raise ValueError( f"{name} must be strictly monotonic after missing values are removed" ) def wrap_pressure_interp_output(output_values): if not isinstance(data, xr.DataArray): return output_values if isinstance(target_pressure, xr.DataArray): output_dim = target_pressure.dims[0] output_coord = xr.DataArray( np.asarray(target_pressure.data), dims=(output_dim,), attrs=target_pressure.attrs.copy(), ) else: output_dim = data.dims[0] output_coord = xr.DataArray(np.asarray(target_pressure), dims=(output_dim,)) return xr.DataArray( np.asarray(output_values), dims=(output_dim,), coords={output_dim: output_coord}, name=data.name, attrs=data.attrs.copy(), ) normalized_method = method.lower() if normalized_method == "linear": linlog = -1 if extrapolate else 1 elif normalized_method == "log": linlog = -2 if extrapolate else 2 else: raise ValueError("`method` must be either 'linear' or 'log'") data_array = np.asarray( data.data if isinstance(data, xr.DataArray) else data, dtype=np.float64, ) source_pressure_array = np.asarray( ( source_pressure.data if isinstance(source_pressure, xr.DataArray) else source_pressure ), dtype=np.float64, ) target_pressure_array = np.asarray( ( target_pressure.data if isinstance(target_pressure, xr.DataArray) else target_pressure ), dtype=np.float64, ) if ( data_array.ndim != 1 or source_pressure_array.ndim != 1 or target_pressure_array.ndim != 1 ): raise ValueError( "`data`, `source_pressure`, and `target_pressure` must each be " "one-dimensional" ) if data_array.shape != source_pressure_array.shape: raise ValueError( "`data` and `source_pressure` must have the same length for " "pressure interpolation" ) output_missing = ( np.nan if is_nan_missing_value(missing_value) else float(missing_value) ) result = np.full(target_pressure_array.shape, output_missing, dtype=np.float64) if target_pressure_array.size == 0: return wrap_pressure_interp_output(result) input_missing_mask = pressure_interp_missing_mask( data_array ) | pressure_interp_missing_mask(source_pressure_array) valid_source_pressure = source_pressure_array[~input_missing_mask] if valid_source_pressure.size < 2: raise ValueError( "interp_pressure_1d requires at least two valid input levels after " "missing values are removed" ) require_strict_monotonic_pressure("`source_pressure`", valid_source_pressure) output_missing_mask = pressure_interp_missing_mask(target_pressure_array) valid_target_pressure = target_pressure_array[~output_missing_mask] require_strict_monotonic_pressure("`target_pressure`", valid_target_pressure) if abs(linlog) != 1: if np.any(valid_source_pressure <= 0.0) or np.any(valid_target_pressure <= 0.0): raise ValueError( "log-pressure interpolation requires strictly positive pressures" ) if valid_target_pressure.size == 0: return wrap_pressure_interp_output(result) source_pressure_work = np.array(source_pressure_array, dtype=np.float64, copy=True) data_work = np.array(data_array, dtype=np.float64, copy=True) target_pressure_work = np.array(valid_target_pressure, dtype=np.float64, copy=True) source_pressure_work[pressure_interp_missing_mask(source_pressure_work)] = ( _PRESSURE_INTERP_SPVL ) data_work[pressure_interp_missing_mask(data_work)] = _PRESSURE_INTERP_SPVL output_valid, ier = _dinterp_pressure_1d( source_pressure_work, data_work, target_pressure_work, linlog, _PRESSURE_INTERP_SPVL, ) output_valid = np.asarray(output_valid, dtype=np.float64) output_valid[output_valid == _PRESSURE_INTERP_SPVL] = output_missing if ier != 0: raise RuntimeError( f"interp_pressure_1d Fortran backend returned ier={ier} for the " "requested pressure interpolation" ) result[~output_missing_mask] = output_valid return wrap_pressure_interp_output(result)
[docs] def pressure_at_hybrid_levels( psfc: supported_types, hya: supported_types, hyb: supported_types, p0: float = 100000.0, lev_dim: str = "lev", output_dims: tuple[str, ...] = ("time", "lev", "lat", "lon"), ) -> supported_types: """Calculate pressure at hybrid levels. Parameters ---------- psfc : :class:`xarray.DataArray`, :class:`numpy.ndarray` Surface pressure in Pascals. hya, hyb : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional hybrid coefficients. p0 : float, optional Reference pressure in Pascals. lev_dim : str, optional Output vertical dimension name for xarray inputs. Defaults to ``"lev"``. output_dims : sequence of str, optional Preferred output dimension order for xarray inputs. Defaults to ``("time", "lev", "lat", "lon")``; any names not present in the output are ignored, and remaining dimensions keep their relative order. Returns ------- :class:`xarray.DataArray`, :class:`numpy.ndarray` Pressure at hybrid levels in Pascals. The output shape is ``(len(hya), *psfc.shape)`` for eager NumPy inputs, or ``(lev, *psfc.dims)`` for eager xarray inputs. """ if not all(isinstance(x, (xr.DataArray, np.ndarray)) for x in (psfc, hya, hyb)): raise TypeError("psfc, hya, and hyb must be xarray DataArrays or numpy arrays") if not ( all(isinstance(x, np.ndarray) for x in (psfc, hya, hyb)) or all(isinstance(x, xr.DataArray) for x in (psfc, hya, hyb)) ): raise TypeError( "psfc, hya, and hyb must all be the same type (all numpy arrays or all xarray DataArrays)" ) if hya.shape != hyb.shape: raise ValueError(f"dimension mismatch: hya: {hya.shape} hyb: {hyb.shape}") _require_compiled_interp( "pressure_at_hybrid_levels", _dpressure_at_hybrid_levels_pa, _dpressure_at_hybrid_levels_pa_into, ) if isinstance(hya, np.ndarray): if hya.ndim != 1: raise ValueError( f"hya and hyb must be 1-dimensional if numpy inputs: {hya.shape}" ) default_output_dims = ("time", "lev", "lat", "lon") if lev_dim != "lev" or tuple(output_dims) != default_output_dims: raise TypeError( "`lev_dim` and `output_dims` are supported only for xarray inputs" ) output_columns = _pressure_at_hybrid_levels_flat(psfc, hya, hyb, p0) return output_columns.reshape((hya.shape[0],) + psfc.shape, order="C") _reject_lazy_or_unit_backed_inputs("pressure_at_hybrid_levels", psfc, hya, hyb) if hya.dims != hyb.dims: warnings.warn( "hya and hyb have different dimension names, attempting rename", stacklevel=2, ) hyb = hyb.rename({b: a for a, b in zip(hya.dims, hyb.dims)}) hya, hyb = _rename_colliding_coeff_dim(psfc, hya, hyb) lev_name = hya.dims[0] lev_coord = _dimension_coord_or_default(hya, lev_name) output_columns = _pressure_at_hybrid_levels_flat(psfc.data, hya.data, hyb.data, p0) output_values = output_columns.reshape((hya.shape[0],) + psfc.shape, order="C") coords = {name: coord for name, coord in psfc.coords.items()} coords[lev_name] = lev_coord pressure = xr.DataArray( output_values, dims=(lev_name, *psfc.dims), coords=coords, ) return _finalize_hybrid_level_output( pressure, lev_name=lev_name, lev_coord=lev_coord, lev_dim=lev_dim, output_dims=output_dims, )
[docs] def delta_pressure_hybrid( ps: supported_types, hya: supported_types, hyb: supported_types, p0: float = 100000.0, lev_dim: str = "lev", output_dims: tuple[str, ...] = ("time", "lev", "lat", "lon"), ) -> supported_types: """Calculate pressure layer thickness for hybrid coordinates. Parameters ---------- ps : :class:`xarray.DataArray`, :class:`numpy.ndarray` Surface pressure in Pascals. hya, hyb : :class:`xarray.DataArray`, :class:`numpy.ndarray` One-dimensional hybrid coefficients defined at layer interfaces. If ``hya`` and ``hyb`` have length ``nlev + 1``, the result contains ``nlev`` layer-thickness values. p0 : float, optional Reference pressure in Pascals. Use ``p0=1`` when ``hya`` already has pressure units, for example ERA5 ``hyai`` / ``hyam`` coefficients. lev_dim : str, optional Output vertical dimension name for xarray inputs. Defaults to ``"lev"``. output_dims : sequence of str, optional Preferred output dimension order for xarray inputs. Defaults to ``("time", "lev", "lat", "lon")``; any names not present in the output are ignored, and remaining dimensions keep their relative order. Returns ------- :class:`xarray.DataArray`, :class:`numpy.ndarray` Pressure layer thickness in Pascals for each hybrid layer. The output shape is ``(len(hya) - 1, *ps.shape)`` for eager NumPy inputs, or ``(lev, *ps.dims)`` for eager xarray inputs. Notes ----- This function expects hybrid coefficients defined at layer interfaces, not layer midpoints. For datasets such as ERA5, use ``hyai`` / ``hybi`` to compute layer-thickness values. If you need pressure at the layer midpoints instead, use :func:`pressure_at_hybrid_levels` with ``hyam`` / ``hybm``. For CESM-family data it is often convenient to keep using ``hyai`` / ``hybi`` for the calculation while leaving the default ``lev_dim="lev"`` and ``output_dims=("time", "lev", "lat", "lon")`` so the result lines up naturally with variables on model midpoints such as ``V(time, lev, lat, lon)``. With xarray inputs, the default behavior is therefore to expose the output vertical dimension as ``"lev"`` and to prefer the common ``("time", "lev", "lat", "lon")`` ordering. If one or more of these dimension names are not present in the output, they are ignored and the remaining dimensions keep their relative order. """ if not all(isinstance(x, (xr.DataArray, np.ndarray)) for x in (ps, hya, hyb)): raise TypeError("Inputs must be xarray DataArrays or numpy arrays") if not isinstance(p0, (float, int, np.floating, np.integer)): raise TypeError(f"p0 must be a scalar numeric value, received {type(p0)}") if hya.shape != hyb.shape: raise ValueError(f"dimension mismatch: hya: {hya.shape} hyb: {hyb.shape}") if np.ndim(hya) != 1: raise ValueError(f"hya and hyb must be 1-dimensional: {hya.shape}") _require_compiled_interp( "delta_pressure_hybrid", _ddelta_pressure_hybrid_pa, _ddelta_pressure_hybrid_pa_into, ) if isinstance(ps, np.ndarray): default_output_dims = ("time", "lev", "lat", "lon") if lev_dim != "lev" or tuple(output_dims) != default_output_dims: raise TypeError( "`lev_dim` and `output_dims` are supported only for xarray inputs" ) hya_values = np.asarray(hya.data if isinstance(hya, xr.DataArray) else hya) hyb_values = np.asarray(hyb.data if isinstance(hyb, xr.DataArray) else hyb) output_columns = _delta_pressure_hybrid_flat(ps, hya_values, hyb_values, p0) return output_columns.reshape((hya_values.shape[0] - 1,) + ps.shape, order="C") _reject_lazy_or_unit_backed_inputs("delta_pressure_hybrid", ps, hya, hyb) if isinstance(hya, np.ndarray): hya = xr.DataArray(hya, dims=("lev",)) hyb = xr.DataArray(hyb, dims=("lev",)) else: hya = _with_dataarray_metadata(hya, np.asarray(hya.data)) hyb = _with_dataarray_metadata(hyb, np.asarray(hyb.data)) if hya.dims != hyb.dims: warnings.warn( "hya and hyb have different dimension names, attempting rename", stacklevel=2, ) hyb = hyb.rename({b: a for a, b in zip(hya.dims, hyb.dims)}) hya, hyb = _rename_colliding_coeff_dim(ps, hya, hyb) lev_name = hya.dims[0] lev_coord = _dimension_coord_or_default(hya, lev_name, size=hya.shape[0] - 1) output_columns = _delta_pressure_hybrid_flat(ps.data, hya.data, hyb.data, p0) output_values = output_columns.reshape((hya.shape[0] - 1,) + ps.shape, order="C") coords = {name: coord for name, coord in ps.coords.items()} coords[lev_name] = lev_coord dph = xr.DataArray( output_values, dims=(lev_name, *ps.dims), coords=coords, ) dph.name = "dph" dph.attrs = { "long_name": "pressure layer thickness", "units": "Pa", } return _finalize_hybrid_level_output( dph, lev_name=lev_name, lev_coord=lev_coord, lev_dim=lev_dim, output_dims=output_dims, )
def geopotential_height_at_hybrid_levels( temperature: xr.DataArray, q: xr.DataArray, ps: xr.DataArray, phis: xr.DataArray, hyai: xr.DataArray, hybi: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float = 100000.0, lev_dim: str = "lev", output_dims: tuple[str, ...] = ("time", "lev", "lat", "lon"), ) -> xr.DataArray: """Compute geopotential height on hybrid model levels. Parameters ---------- temperature : :class:`xarray.DataArray` Air temperature on hybrid full levels. q : :class:`xarray.DataArray` Specific humidity on hybrid full levels. Used to form virtual temperature in the hypsometric integration. ps : :class:`xarray.DataArray` Surface pressure in Pascals. phis : :class:`xarray.DataArray` Surface geopotential in ``m^2 s^-2``. hyai, hybi : :class:`xarray.DataArray` One-dimensional hybrid A/B coefficients at layer interfaces. If the top interface pressure is exactly zero, the calculation keeps the ECMWF/IFS top-level shortcut. Otherwise it uses the general hypsometric layer integral at the top model level as well. hyam, hybm : :class:`xarray.DataArray` One-dimensional hybrid A/B coefficients at layer midpoints. p0 : float, optional Reference pressure in Pascals. Defaults to ``100000.0``. Use ``p0=1.0`` with ERA5's native ``a``/``b`` coefficients because the ERA5 half-level relation is ``p_half = a + b * sp`` and the ERA5 ``a`` coefficients are already stored in Pascals. Use ``p0=100000.0`` only when your hybrid ``A`` coefficients are normalized by a 100000 Pa reference pressure, as in many CESM-style files. lev_dim : str, optional Output vertical dimension name. Defaults to ``"lev"``. output_dims : sequence of str, optional Preferred output dimension order. Defaults to ``("time", "lev", "lat", "lon")``; missing names are ignored. Returns ------- :class:`xarray.DataArray` Geopotential height on model full levels in meters. Notes ----- The vertical integration follows the ECMWF ERA5 model-level geopotential recipe for virtual temperature and top-layer handling when the top interface pressure is zero. See the ECMWF Copernicus Knowledge Base: https://confluence.ecmwf.int/display/CKB/ERA5%3A+compute+pressure+and+geopotential+on+model+levels%2C+geopotential+height+and+geometric+height """ if not all( isinstance(x, xr.DataArray) for x in (temperature, q, ps, phis, hyai, hybi, hyam, hybm) ): raise TypeError( "temperature, q, ps, phis, hyai, hybi, hyam, and hybm must be xarray DataArray objects" ) if not isinstance(p0, (float, int, np.floating, np.integer)): raise TypeError(f"p0 must be a scalar numeric value, received {type(p0)}") _require_compiled_interp( "geopotential_height_at_hybrid_levels", _dgeopotential_height_hybrid_corder_pa_into, ) _reject_lazy_or_unit_backed_inputs( "geopotential_height_at_hybrid_levels", temperature, q, ps, phis, hyai, hybi, hyam, hybm, ) if lev_dim is None: try: if hasattr(temperature.cf, "guess_coord_axis"): temperature = temperature.cf.guess_coord_axis() lev_dim = temperature.cf["vertical"].name except Exception as exc: raise ValueError( "Unable to determine vertical dimension name. Please specify the name via `lev_dim` argument." ) from exc hyam, hybm = _align_hybrid_level_dimension(hyam, hybm, lev_dim) if hyai.shape != hybi.shape: raise ValueError( f"dimension mismatch between `hyai` and `hybi`: {hyai.shape} vs {hybi.shape}" ) if hyai.ndim != 1 or hybi.ndim != 1: raise ValueError("`hyai` and `hybi` must be one-dimensional arrays") if hyai.shape[0] != hyam.shape[0] + 1: raise ValueError( "`hyai`/`hybi` must be exactly one element longer than `hyam`/`hybm`" ) if hyai.dims != hybi.dims: warnings.warn( "hyai and hybi have different dimension names, attempting rename", stacklevel=2, ) hybi = hybi.rename( {source: target for target, source in zip(hyai.dims, hybi.dims)} ) if tuple(q.dims) != tuple(temperature.dims) or tuple(q.shape) != tuple( temperature.shape ): raise ValueError("`q` must have the same dims and shape as `temperature`") return _geopotential_height_hybrid_corder( temperature=temperature, q=q, ps=ps, phis=phis, hyai=hyai, hybi=hybi, hyam=hyam, p0=float(p0), lev_dim=lev_dim, output_dims=output_dims, ) def _geopotential_height_hybrid_corder( temperature: xr.DataArray, q: xr.DataArray, ps: xr.DataArray, phis: xr.DataArray, hyai: xr.DataArray, hybi: xr.DataArray, hyam: xr.DataArray, p0: float, lev_dim: str, output_dims, ) -> xr.DataArray | None: """Run geopotential-height calculation through the C-order backend.""" if _dgeopotential_height_hybrid_corder_pa_into is None: return None interp_axis = temperature.dims.index(lev_dim) shape_before = temperature.shape[:interp_axis] shape_after = temperature.shape[interp_axis + 1 :] lead_dims = temperature.dims[:interp_axis] + temperature.dims[interp_axis + 1 :] lead_shape = shape_before + shape_after nouter = int(np.prod(shape_before, dtype=np.int64)) if shape_before else 1 ninner = int(np.prod(shape_after, dtype=np.int64)) if shape_after else 1 nlev = temperature.shape[interp_axis] base_template = temperature.isel({lev_dim: 0}, drop=True) raw_temperature = temperature.data raw_q = q.data if ( isinstance(raw_temperature, np.ndarray) and raw_temperature.flags.c_contiguous and raw_temperature.dtype == np.float64 ): temp_flat = raw_temperature.reshape(-1) else: temp_flat = _compiled_float64_flat( temperature.transpose(*temperature.dims).data ) if ( isinstance(raw_q, np.ndarray) and raw_q.flags.c_contiguous and raw_q.dtype == np.float64 ): q_flat = raw_q.reshape(-1) else: q_flat = _compiled_float64_flat(q.transpose(*q.dims).data) ps_flat = _as_c_contiguous_compiled_flat(ps, lead_dims, lead_shape) if ps_flat is None: ps_flat = _as_broadcast_float64_flat(ps, base_template) phis_flat = _as_c_contiguous_compiled_flat(phis, lead_dims, lead_shape) if phis_flat is None: phis_flat = _as_broadcast_float64_flat(phis, base_template) if ps_flat is None or phis_flat is None: return None if ps_flat.size != nouter * ninner or phis_flat.size != nouter * ninner: return None hyai_values = _compiled_float64_vector(hyai.data) hybi_values = _compiled_float64_vector(hybi.data) output_values = _compiled_float64_output(temperature.shape, order="C") output_flat = output_values.reshape(-1) _dgeopotential_height_hybrid_corder_pa_into( temp_flat, q_flat, output_flat, ps_flat, phis_flat, hyai_values, hybi_values, float(p0), nouter=nouter, nlev=nlev, ninner=ninner, ) output_values = _restore_compiled_interp_output_dtype(output_values, temperature) coords = {k: v for k, v in base_template.coords.items()} lev_coord = _dimension_coord_or_default(hyam, hyam.dims[0], output_dim=lev_dim) coords[lev_dim] = lev_coord z3 = xr.DataArray( output_values, dims=tuple( dim if idx != interp_axis else lev_dim for idx, dim in enumerate(temperature.dims) ), coords=coords, name="z3", attrs={ "long_name": "geopotential height on hybrid model levels", "units": "m", }, ) return _finalize_hybrid_level_output( z3, lev_name=lev_dim, lev_coord=lev_coord, lev_dim=lev_dim, output_dims=output_dims, ) def _pressure_from_hybrid(psfc, hya, hyb, p0=100000.0): """Backward-compatible wrapper for :func:`pressure_at_hybrid_levels`.""" return pressure_at_hybrid_levels( psfc, hya, hyb, p0, lev_dim=None, output_dims=None, ) def _pre_interp_multidim( data_in: xr.DataArray, cyclic: bool, missing_val, ): """Helper Function: Handling missing data functionality and adding cyclic point if required. Parameters ---------- data_in : :class:`xarray.DataArray` The data on which to operate cyclic : :class:`bool` Determines if cyclic point should be added or not. If true then add point, else do nothing. missing_val : int, float, optional Provides an alternative to NaN Returns ------- data_in : :class:`xarray.DataArray` The data input with cyclic points added (if cyclic is true) and missing_val values replaced with np.nan Notes ------- """ # replace missing_val with np.nan if missing_val is not None: data_in = _with_dataarray_metadata( data_in, np.where(data_in.values == missing_val, np.nan, data_in.values) ) # add cyclic points and create new data array if cyclic: padded_data = np.pad(data_in.values, ((0, 0), (1, 1)), mode="wrap") padded_longitudes = np.pad( data_in.coords[data_in.dims[-1]], (1, 1), mode="wrap" ) padded_longitudes[0] -= 360 padded_longitudes[-1] += 360 data_in = _with_dataarray_metadata( data_in, padded_data, coords={ data_in.dims[-2]: data_in.coords[data_in.dims[-2]].values, data_in.dims[-1]: padded_longitudes, }, ) return data_in def _post_interp_multidim(data_in, missing_val): """Helper Function: Handling missing data functionality. Parameters ---------- data_in : :class:`xarray.DataArray` The data on which to operate missing_val : int, float, optional Provides an alternative to NaN Returns ------- data_in : :class:`xarray.DataArray` The data input with np.nan values replaced with missing_val """ if missing_val is not None: data_in = _with_dataarray_metadata( data_in, np.where(np.isnan(data_in.values), missing_val, data_in.values) ) return data_in def _sigma_from_hybrid(psfc, hya, hyb, p0=100000.0): """Calculate sigma at the hybrid levels.""" if isinstance(hya, xr.DataArray) and isinstance(hyb, xr.DataArray): if hya.dims != hyb.dims: warnings.warn( "hya and hyb have different dimension names, attempting rename", stacklevel=2, ) hyb = hyb.rename({b: a for a, b in zip(hya.dims, hyb.dims)}) hya, hyb = _rename_colliding_coeff_dim(psfc, hya, hyb) # sig(k) = hya(k) * p0 / psfc + hyb(k) # This will be in Pa return hya * p0 / psfc + hyb def _is_dask_backed(array): """Return True when an xarray object is backed by a dask array.""" return ( isinstance(array, xr.DataArray) and getattr(array.data, "chunks", None) is not None ) def _is_pint_backed(array): """Return True when an xarray object is backed by pint quantities.""" if not isinstance(array, xr.DataArray): return False module_name = getattr(array.data, "__module__", "") return module_name.startswith("pint") or ( hasattr(array.data, "magnitude") and hasattr(array.data, "units") ) def _require_compiled_interp(function_name: str, *handles) -> None: """Raise a clear error when a compiled interpolation backend is unavailable.""" if not any(handle is not None for handle in handles): raise RuntimeError( f"{function_name} requires the compiled Fortran backend in " "`skyborn.interp.fortran`." ) def _reject_lazy_or_unit_backed_inputs(function_name: str, *arrays) -> None: """Reject lazy or pint-backed arrays for compiled-only interpolation APIs.""" active_arrays = [array for array in arrays if array is not None] if any(_is_dask_backed(array) for array in active_arrays): raise NotImplementedError( f"{function_name} no longer provides a Dask fallback path. " "Use eager NumPy/xarray inputs backed by the compiled Fortran kernels." ) if any(_is_pint_backed(array) for array in active_arrays): raise NotImplementedError( f"{function_name} no longer provides a Pint/MetPy fallback path. " "Use eager NumPy/xarray inputs backed by the compiled Fortran kernels." ) def _align_hybrid_level_dimension(hyam, hybm, lev_dim): """Rename hybrid coefficient dimensions so they align with ``lev_dim``.""" if hyam.shape != hybm.shape: raise ValueError( f"dimension mismatch between `hyam` and `hybm`: {hyam.shape} vs {hybm.shape}" ) if hyam.ndim != 1 or hybm.ndim != 1: raise ValueError("`hyam` and `hybm` must be one-dimensional arrays") if hyam.dims != hybm.dims: warnings.warn( "hyam and hybm have different dimension names, attempting rename", stacklevel=2, ) hybm = hybm.rename( {source: target for target, source in zip(hyam.dims, hybm.dims)} ) coeff_dim = hyam.dims[0] if coeff_dim != lev_dim: hyam = hyam.rename({coeff_dim: lev_dim}) hybm = hybm.rename({hybm.dims[0]: lev_dim}) return hyam, hybm def _vinth2p_intyp(method: str) -> int: """Translate public interpolation methods to legacy vinth2p flags.""" method = _normalize_interp_method(method) if method == "linear": return 1 if method == "log": return 2 if method == "log-log": return 3 raise ValueError( f"Unknown interpolation method: {method}. " f'Supported methods are: "linear", "log", and "log-log".' ) def _vinth2p_varflg(variable: str) -> int: """Translate public extrapolation variable labels to ECMWF vinth flags.""" if variable == "temperature": return 1 if variable == "geopotential": return -1 return 0 def _compiled_interp_output_dtype(data: xr.DataArray) -> np.dtype: """Return the public result dtype while the compiled kernels stay float64.""" dtype = np.dtype(np.asarray(data.data).dtype) if np.issubdtype(dtype, np.floating): if dtype.itemsize <= np.dtype(np.float32).itemsize: return np.dtype(np.float32) return np.dtype(np.float64) return np.dtype(np.float64) def _restore_compiled_interp_output_dtype( output_values: np.ndarray, data: xr.DataArray ) -> np.ndarray: """Cast compiled float64 output back to the public result precision.""" output_dtype = _compiled_interp_output_dtype(data) if output_values.dtype == output_dtype: return output_values return output_values.astype(output_dtype, copy=False) def _compiled_float64_vector(values) -> np.ndarray: """Materialize a 1D float64 array for the compiled interpolation boundary.""" return np.asarray(values, dtype=np.float64).reshape(-1) def _compiled_float64_flat(values) -> np.ndarray: """Materialize a contiguous 1D float64 buffer for the compiled boundary.""" return np.ascontiguousarray(np.asarray(values, dtype=np.float64)).reshape(-1) def _compiled_float64_flat_with_missing(values, spvl: float) -> np.ndarray: """Materialize a C-order float64 buffer with NaNs converted to ``spvl``.""" flat = _compiled_float64_flat(values) if np.isnan(flat).any(): flat = flat.copy() flat[np.isnan(flat)] = spvl return flat def _compiled_float64_columns(values, ncol: int, nlev: int) -> np.ndarray: """Materialize a Fortran-ordered float64 (nlev, ncol) matrix for kernels.""" return np.asfortranarray(np.asarray(values, dtype=np.float64).reshape(ncol, nlev).T) def _compiled_float64_output(shape, order: str = "F") -> np.ndarray: """Allocate a float64 output buffer for compiled interpolation kernels.""" return np.empty(shape, dtype=np.float64, order=order) def _delta_pressure_hybrid_flat(ps_values, hya_values, hyb_values, p0) -> np.ndarray: """Run eager hybrid layer-thickness calculation through the compiled backend.""" if _ddelta_pressure_hybrid_pa is None: raise RuntimeError("delta-pressure backend is not available") ps_flat = _compiled_float64_flat(ps_values) hya_vector = _compiled_float64_vector(hya_values) hyb_vector = _compiled_float64_vector(hyb_values) ncol = ps_flat.size nlev = hya_vector.size nlevo = nlev - 1 output_columns = _compiled_float64_output((nlevo, ncol), order="F") if _ddelta_pressure_hybrid_pa_into is not None: try: _ddelta_pressure_hybrid_pa_into( ps_flat, output_columns, hya_vector, hyb_vector, float(p0), nlevo=nlevo, ncol=ncol, nlev=nlev, ) except Exception as exc: if "ddelta_pressure_hybrid_pa_into" not in str(exc): raise _ddelta_pressure_hybrid_pa_into( ps_flat, output_columns, hya_vector, hyb_vector, float(p0), nlevo, ) return output_columns try: return _ddelta_pressure_hybrid_pa( ps_flat, hya_vector, hyb_vector, float(p0), nlevo=nlevo, ncol=ncol, nlev=nlev, ) except Exception as exc: if "ddelta_pressure_hybrid_pa" not in str(exc): raise return _ddelta_pressure_hybrid_pa( ps_flat, hya_vector, hyb_vector, float(p0), nlevo, ) def _pressure_at_hybrid_levels_flat( ps_values, hya_values, hyb_values, p0 ) -> np.ndarray: """Run eager hybrid-pressure calculation through the compiled backend.""" if _dpressure_at_hybrid_levels_pa is None: raise RuntimeError("pressure-at-hybrid-levels backend is not available") ps_flat = _compiled_float64_flat(ps_values) hya_vector = _compiled_float64_vector(hya_values) hyb_vector = _compiled_float64_vector(hyb_values) ncol = ps_flat.size nlev = hya_vector.size output_columns = _compiled_float64_output((nlev, ncol), order="F") if _dpressure_at_hybrid_levels_pa_into is not None: try: _dpressure_at_hybrid_levels_pa_into( ps_flat, output_columns, hya_vector, hyb_vector, float(p0), ncol=ncol, nlev=nlev, ) except Exception as exc: if "dpressure_at_hybrid_levels_pa_into" not in str(exc): raise _dpressure_at_hybrid_levels_pa_into( ps_flat, output_columns, hya_vector, hyb_vector, float(p0), ) return output_columns try: return _dpressure_at_hybrid_levels_pa( ps_flat, hya_vector, hyb_vector, float(p0), ncol=ncol, nlev=nlev, ) except Exception as exc: if "dpressure_at_hybrid_levels_pa" not in str(exc): raise return _dpressure_at_hybrid_levels_pa( ps_flat, hya_vector, hyb_vector, float(p0), ) def _as_c_contiguous_compiled_flat(array: xr.DataArray, dims, shape): """Return a float64 flat buffer for aligned C-order floating inputs.""" if array is None or not isinstance(array, xr.DataArray): return None if tuple(array.dims) != tuple(dims) or tuple(array.shape) != tuple(shape): return None values = array.data if not isinstance(values, np.ndarray): return None if not values.flags.c_contiguous or not np.issubdtype(values.dtype, np.floating): return None if values.dtype == np.float64: return values.reshape(-1) return _compiled_float64_flat(values) def _as_broadcast_float64_flat(array: xr.DataArray, template: xr.DataArray): """Return a flattened float64 array after lightweight xarray broadcasting.""" if array is None or not isinstance(array, xr.DataArray): return None try: broadcast = array.broadcast_like(template) except Exception: return None return _compiled_float64_flat(broadcast.data) def _build_vinth2p_output(data, interp_axis, new_levels, output_values, base_template): """Wrap a NumPy output array in the public xarray result.""" output_values = _restore_compiled_interp_output_dtype(output_values, data) coords = {k: v for k, v in base_template.coords.items()} coords["plev"] = new_levels output_dims = tuple( dim if idx != interp_axis else "plev" for idx, dim in enumerate(data.dims) ) return xr.DataArray( output_values, dims=output_dims, coords=coords, name=data.name, attrs=data.attrs, ) def _interp_hybrid_to_pressure_corder( data: xr.DataArray, ps: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float, new_levels: np.ndarray, lev_dim: str, method: str, extrapolate: bool, variable: str, t_bot: xr.DataArray, phi_sfc: xr.DataArray, ) -> xr.DataArray | None: """Use a no-transpose fast path for NumPy C-order arrays with aligned dims.""" if _dvinth2p_nodes_corder_pa_into is None: return None interp_axis = data.dims.index(lev_dim) raw_data = data.data if not isinstance(raw_data, np.ndarray): return None if not raw_data.flags.c_contiguous or not np.issubdtype( raw_data.dtype, np.floating ): return None shape_before = data.shape[:interp_axis] shape_after = data.shape[interp_axis + 1 :] lead_dims = data.dims[:interp_axis] + data.dims[interp_axis + 1 :] lead_shape = shape_before + shape_after nouter = int(np.prod(shape_before, dtype=np.int64)) if shape_before else 1 ninner = int(np.prod(shape_after, dtype=np.int64)) if shape_after else 1 nlevi = data.shape[interp_axis] nlevo = new_levels.size raw_data_flat = ( raw_data.reshape(-1) if raw_data.dtype == np.float64 else _compiled_float64_flat(raw_data) ) ps_flat = _as_c_contiguous_compiled_flat(ps, lead_dims, lead_shape) if ps_flat is None or not np.isfinite(ps_flat).all(): return None hyam_values = _compiled_float64_vector(hyam.data) hybm_values = _compiled_float64_vector(hybm.data) new_level_values = _compiled_float64_vector(new_levels) intyp = _vinth2p_intyp(method) base_template = data.isel({lev_dim: 0}, drop=True) output_values = _compiled_float64_output( (*shape_before, nlevo, *shape_after), order="C" ) output_flat = output_values.reshape(-1) if extrapolate: varflg = _vinth2p_varflg(variable) if _dvinth2p_ecmwf_nodes_corder_pa_into is None: return None if varflg == 0: tbot_flat = np.zeros(nouter * ninner, dtype=np.float64) phi_flat = np.zeros(nouter * ninner, dtype=np.float64) else: tbot_flat = _as_c_contiguous_compiled_flat(t_bot, lead_dims, lead_shape) if tbot_flat is None: tbot_flat = _as_broadcast_float64_flat(t_bot, base_template) phi_flat = _as_c_contiguous_compiled_flat(phi_sfc, lead_dims, lead_shape) if phi_flat is None: phi_flat = _as_broadcast_float64_flat(phi_sfc, base_template) if tbot_flat is None or phi_flat is None: return None _dvinth2p_ecmwf_nodes_corder_pa_into( raw_data_flat, output_flat, hyam_values, hybm_values, float(p0), new_level_values, intyp, ps_flat, _SPVL, 1, nouter, ninner, varflg, tbot_flat, phi_flat, ) else: _dvinth2p_nodes_corder_pa_into( raw_data_flat, output_flat, hyam_values, hybm_values, float(p0), new_level_values, intyp, ps_flat, _SPVL, 0, nouter, ninner, ) output_values[output_values == _SPVL] = np.nan return _build_vinth2p_output( data=data, interp_axis=interp_axis, new_levels=new_levels, output_values=output_values, base_template=base_template, ) def _interp_hybrid_to_pressure( data: xr.DataArray, ps: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float, new_levels: np.ndarray, lev_dim: str, method: str, extrapolate: bool, variable: str, t_bot: xr.DataArray, phi_sfc: xr.DataArray, ) -> xr.DataArray: """Run eager hybrid-to-pressure interpolation through the compiled vinth2p backend.""" if _dvinth2p_nodes_pa is None: raise RuntimeError("vinth2p backend is not available") corder_output = _interp_hybrid_to_pressure_corder( data=data, ps=ps, hyam=hyam, hybm=hybm, p0=p0, new_levels=new_levels, lev_dim=lev_dim, method=method, extrapolate=extrapolate, variable=variable, t_bot=t_bot, phi_sfc=phi_sfc, ) if corder_output is not None: return corder_output base_template = data.isel({lev_dim: 0}, drop=True) lead_dims = base_template.dims interp_axis = data.dims.index(lev_dim) data_view = data.transpose(*lead_dims, lev_dim) nlevi = data_view.shape[-1] nlevo = new_levels.size lead_shape = data_view.shape[:-1] ncol = int(np.prod(lead_shape, dtype=np.int64)) if lead_shape else 1 data_columns = _compiled_float64_columns(data_view.data, ncol=ncol, nlev=nlevi) ps_columns = _compiled_float64_flat( ps.broadcast_like(base_template).transpose(*lead_dims).data ) ps_columns = np.where(np.isfinite(ps_columns), ps_columns, _SPVL) hyam_values = _compiled_float64_vector(hyam.data) hybm_values = _compiled_float64_vector(hybm.data) new_level_values = _compiled_float64_vector(new_levels) intyp = _vinth2p_intyp(method) output_columns = _compiled_float64_output((nlevo, ncol), order="F") if extrapolate: varflg = _vinth2p_varflg(variable) if varflg == 0: t_bot_columns = np.zeros(ncol, dtype=np.float64) phi_columns = np.zeros(ncol, dtype=np.float64) else: t_bot_columns = _compiled_float64_flat( t_bot.broadcast_like(base_template).transpose(*lead_dims).data ) phi_columns = _compiled_float64_flat( phi_sfc.broadcast_like(base_template).transpose(*lead_dims).data ) if _dvinth2p_ecmwf_nodes_pa_into is not None: _dvinth2p_ecmwf_nodes_pa_into( data_columns, output_columns, hyam_values, hybm_values, float(p0), new_level_values, intyp, ps_columns, _SPVL, 1, varflg, t_bot_columns, phi_columns, ) else: output_columns = _dvinth2p_ecmwf_nodes_pa( data_columns, hyam_values, hybm_values, float(p0), new_level_values, intyp, ps_columns, _SPVL, 1, varflg, t_bot_columns, phi_columns, ) else: if _dvinth2p_nodes_pa_into is not None: _dvinth2p_nodes_pa_into( data_columns, output_columns, hyam_values, hybm_values, float(p0), new_level_values, intyp, ps_columns, _SPVL, 0, ) else: output_columns = _dvinth2p_nodes_pa( data_columns, hyam_values, hybm_values, float(p0), new_level_values, intyp, ps_columns, _SPVL, 0, ) output_values = output_columns.T.reshape((*lead_shape, nlevo)) output_values[output_values == _SPVL] = np.nan output_values = _restore_compiled_interp_output_dtype(output_values, data) coords = {k: v for k, v in base_template.coords.items()} coords["plev"] = new_levels output = xr.DataArray( output_values, dims=(*lead_dims, "plev"), coords=coords, name=data.name, attrs=data.attrs, ) dims = [data.dims[i] if i != interp_axis else "plev" for i in range(data.ndim)] return output.transpose(*dims) def _interp_sigma_to_hybrid( data: xr.DataArray, sig_coords: xr.DataArray, ps: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float, lev_dim: str, method: str, ) -> xr.DataArray: """Run eager sigma-to-hybrid interpolation through the compiled backend.""" if _dsigma2hybrid_nodes is None: raise RuntimeError("sigma2hybrid backend is not available") sigma_source_values = _compiled_float64_vector( sig_coords.data if isinstance(sig_coords, xr.DataArray) else sig_coords ) sigma_diffs = np.diff(sigma_source_values) if not ( np.all(sigma_diffs >= 0.0) or np.all(sigma_diffs <= 0.0) or sigma_source_values.size <= 1 ): raise ValueError("sigma2hybrid backend requires monotonic sigma coordinates") corder_output = _interp_sigma_to_hybrid_corder( data=data, ps=ps, hyam=hyam, hybm=hybm, p0=p0, lev_dim=lev_dim, method=method, sigma_source_values=sigma_source_values, ) if corder_output is not None: return corder_output sigma_target = _sigma_from_hybrid(ps, hyam, hybm, p0) target_dim = sigma_target.dims[0] base_template = data.isel({lev_dim: 0}, drop=True) lead_dims = base_template.dims interp_axis = data.dims.index(lev_dim) data_view = data.transpose(*lead_dims, lev_dim) sigma_target_view = sigma_target.transpose(target_dim, *lead_dims) nlevi = data_view.shape[-1] nlevo = sigma_target_view.shape[0] lead_shape = data_view.shape[:-1] ncol = int(np.prod(lead_shape, dtype=np.int64)) if lead_shape else 1 data_columns = _compiled_float64_columns(data_view.data, ncol=ncol, nlev=nlevi) sigma_target_columns = np.asfortranarray( np.asarray(sigma_target_view.data, dtype=np.float64).reshape(nlevo, ncol) ) intyp = _vinth2p_intyp(method) output_columns = _compiled_float64_output((nlevo, ncol), order="F") if _dsigma2hybrid_nodes_into is not None: _dsigma2hybrid_nodes_into( data_columns, output_columns, sigma_source_values, sigma_target_columns, intyp, _SPVL, nlevo, nlevi=nlevi, ncol=ncol, ) else: output_columns = _dsigma2hybrid_nodes( data_columns, sigma_source_values, sigma_target_columns, intyp, _SPVL, nlevo, nlevi=nlevi, ncol=ncol, ) output_values = output_columns.T.reshape((*lead_shape, nlevo)) output_values[output_values == _SPVL] = np.nan output_values = _restore_compiled_interp_output_dtype(output_values, data) coords = {k: v for k, v in base_template.coords.items()} coords["hlev"] = _dimension_coord_or_default(hyam, hyam.dims[0], output_dim="hlev") output = xr.DataArray( output_values, dims=(*lead_dims, "hlev"), coords=coords, name=data.name, attrs=data.attrs, ) dims = [data.dims[i] if i != interp_axis else "hlev" for i in range(data.ndim)] return output.transpose(*dims) def _interp_sigma_to_hybrid_corder( data: xr.DataArray, ps: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float, lev_dim: str, method: str, sigma_source_values: np.ndarray, ) -> xr.DataArray | None: """Use a no-transpose fast path for NumPy C-order sigma->hybrid inputs.""" if _dsigma2hybrid_nodes_corder_into is None: return None interp_axis = data.dims.index(lev_dim) raw_data = data.data if not isinstance(raw_data, np.ndarray): return None if not raw_data.flags.c_contiguous or not np.issubdtype( raw_data.dtype, np.floating ): return None shape_before = data.shape[:interp_axis] shape_after = data.shape[interp_axis + 1 :] lead_dims = data.dims[:interp_axis] + data.dims[interp_axis + 1 :] lead_shape = shape_before + shape_after nouter = int(np.prod(shape_before, dtype=np.int64)) if shape_before else 1 ninner = int(np.prod(shape_after, dtype=np.int64)) if shape_after else 1 nlevi = data.shape[interp_axis] nlevo = hyam.shape[0] base_template = data.isel({lev_dim: 0}, drop=True) raw_data_flat = ( raw_data.reshape(-1) if raw_data.dtype == np.float64 else _compiled_float64_flat(raw_data) ) ps_flat = _as_c_contiguous_compiled_flat(ps, lead_dims, lead_shape) if ps_flat is None: ps_flat = _as_broadcast_float64_flat(ps, base_template) if ps_flat is None or not np.isfinite(ps_flat).all(): return None hyam_values = _compiled_float64_vector(hyam.data) hybm_values = _compiled_float64_vector(hybm.data) intyp = _vinth2p_intyp(method) output_values = _compiled_float64_output( (*shape_before, nlevo, *shape_after), order="C" ) output_flat = output_values.reshape(-1) _dsigma2hybrid_nodes_corder_into( raw_data_flat, output_flat, sigma_source_values, hyam_values, hybm_values, float(p0), ps_flat, intyp, _SPVL, nouter, ninner, nlevo, nlevi=nlevi, ) output_values[output_values == _SPVL] = np.nan output_values = _restore_compiled_interp_output_dtype(output_values, data) coords = {k: v for k, v in base_template.coords.items()} coords["hlev"] = _dimension_coord_or_default(hyam, hyam.dims[0], output_dim="hlev") output = xr.DataArray( output_values, dims=tuple( dim if idx != interp_axis else "hlev" for idx, dim in enumerate(data.dims) ), coords=coords, name=data.name, attrs=data.attrs, ) return output
[docs] def interp_hybrid_to_pressure( data: xr.DataArray, ps: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float = 100000.0, new_levels: coordinate_types = __pres_lev_mandatory__, lev_dim: str | None = None, method: str = "linear", extrapolate: bool = False, variable: str | None = None, t_bot: xr.DataArray | None = None, phi_sfc: xr.DataArray | None = None, ) -> xr.DataArray: """Interpolate and extrapolate data from hybrid-sigma levels to isobaric levels. This function interpolates atmospheric data from hybrid-sigma coordinate levels to constant pressure levels, with optional extrapolation below ground using ECMWF formulations. Preserves all metadata from the input data. Notes ----- Atmosphere hybrid-sigma pressure coordinates are commonly defined in two different ways as described below and in CF Conventions. This particular function expects the first formulation. However, with some minor adjustments on the user side it can support datasets leveraging the second formulation as well. In this case, you can set the input parameters p0=1 and hyam=ap to adapt the function to meet your needs. Formulation 1: p(n,k,j,i) = a(k)*p0 + b(k)*ps(n,j,i) Formulation 2: p(n,k,j,i) = ap(k) + b(k)*ps(n,j,i) Parameters ---------- data : :class:`xarray.DataArray` Multidimensional data array with hybrid-sigma levels and a ``lev_dim`` coordinate. ps : :class:`xarray.DataArray` A multi-dimensional array of surface pressures (Pa), same time/space shape as data. hyam, hybm : :class:`xarray.DataArray` One-dimensional arrays containing the hybrid A and B coefficients. Must have the same dimension size as the ``lev_dim`` dimension of data. p0 : float, optional Scalar numeric value equal to surface reference pressure (Pa). Defaults to 100000 Pa. new_levels : ndarray, optional A one-dimensional array of output pressure levels (Pa). If not given, the mandatory list of 21 pressure levels is used. lev_dim : str, optional String that is the name of level dimension in data. If None, attempts to detect automatically using CF conventions. method : str, optional Interpolation method passed to the compiled vinth2p kernels. Supported values are ``"linear"``, ``"log"``, and ``"log-log"`` (or ``"loglog"``). Defaults to ``"linear"``. extrapolate : bool, optional If True, below ground extrapolation for ``variable`` will be done using an `ECMWF formulation <https://dx.doi.org/10.5065/D6HX19NH>`__. Defaults to False. variable : str, optional String representing what variable is extrapolated below surface level. Temperature extrapolation = "temperature". Geopotential height extrapolation = "geopotential". All other variables = "other". If "other", the value of ``data`` at the lowest model level will be used as the below ground fill value. Required if extrapolate is True. t_bot : :class:`xarray.DataArray`, optional Temperature in Kelvin at the lowest layer of the model. Not necessarily the same as surface temperature. Required if ``extrapolate`` is True and ``variable`` is not ``'other'`` phi_sfc: :class:`xarray.DataArray`, optional Geopotential in J/kg at the lowest layer of the model. Not necessarily the same as surface geopotential. Required if ``extrapolate`` is True and ``variable`` is not ``'other'``. Returns ------- output : :class:`xarray.DataArray` Interpolated data with isobaric levels as the new vertical coordinate Examples -------- Basic interpolation from hybrid-sigma to pressure levels: >>> import skyborn.interp as si >>> import xarray as xr >>> import numpy as np >>> >>> # Interpolate temperature to standard pressure levels >>> temp_p = si.interp_hybrid_to_pressure( ... data=temperature, ... ps=surface_pressure, ... hyam=hybrid_a, ... hybm=hybrid_b, ... new_levels=np.array([100000, 85000, 70000]) # Pa ... ) See Also -------- interp_sigma_to_hybrid : Interpolate from sigma to hybrid coordinates Related NCL Functions: `vinth2p <https://www.ncl.ucar.edu/Document/Functions/Built-in/vinth2p.shtml>`__, `vinth2p_ecmwf <https://www.ncl.ucar.edu/Document/Functions/Built-in/vinth2p_ecmwf.shtml>`__ """ if not all(isinstance(x, xr.DataArray) for x in (data, ps, hyam, hybm)): raise TypeError("data, ps, hyam, and hybm must be xarray DataArray objects") _require_compiled_interp( "interp_hybrid_to_pressure", _dvinth2p_nodes_pa, _dvinth2p_nodes_pa_into, _dvinth2p_nodes_corder_pa_into, _dvinth2p_ecmwf_nodes_pa, _dvinth2p_ecmwf_nodes_pa_into, _dvinth2p_ecmwf_nodes_corder_pa_into, ) _reject_lazy_or_unit_backed_inputs( "interp_hybrid_to_pressure", data, ps, hyam, hybm, t_bot, phi_sfc ) new_levels = np.asarray(new_levels) # Check inputs if extrapolate and (variable is None): raise ValueError("If `extrapolate` is True, `variable` must be provided.") if variable in ["geopotential", "temperature"] and ( t_bot is None or phi_sfc is None ): raise ValueError( "If `variable` is 'geopotential' or 'temperature', both `t_bot` and `phi_sfc` must be provided" ) if variable not in ["geopotential", "temperature", "other", None]: raise ValueError( "The value of `variable` is " + variable + ", but the accepted values are 'temperature', 'geopotential', 'other', or None." ) # Determine the level dimension and then the interpolation axis if lev_dim is None: try: if hasattr(data.cf, "guess_coord_axis"): data = data.cf.guess_coord_axis() lev_dim = data.cf["vertical"].name except Exception as exc: raise ValueError( "Unable to determine vertical dimension name. Please specify the name via `lev_dim` argument." ) from exc hyam, hybm = _align_hybrid_level_dimension(hyam, hybm, lev_dim) method = _normalize_interp_method(method) return _interp_hybrid_to_pressure( data=data, ps=ps, hyam=hyam, hybm=hybm, p0=p0, new_levels=new_levels, lev_dim=lev_dim, method=method, extrapolate=extrapolate, variable=variable, t_bot=t_bot, phi_sfc=phi_sfc, )
def interp_to_isentropic( data: xr.DataArray, temperature: xr.DataArray, pressure: xr.DataArray, theta_levels: coordinate_types, *, lev_dim: str | None = None, p0: float = 100000.0, kappa: float = _RD_OVER_CP, extrapolate: bool = False, missing_value: object = np.nan, output_dim: str = "theta", ) -> xr.DataArray: """Interpolate data from model or pressure levels to isentropic levels. Parameters ---------- data, temperature, pressure : :class:`xarray.DataArray` Arrays sharing the same vertical dimension. ``temperature`` is in Kelvin and ``pressure`` is in Pa. theta_levels : array-like or :class:`xarray.DataArray` Target potential-temperature levels in Kelvin. lev_dim : str, optional Name of the input vertical dimension. If omitted, common names are tried. p0 : float, optional Reference pressure in Pa. Defaults to 100000 Pa. kappa : float, optional Poisson exponent ``R_d / c_p``. Defaults to 0.2854. extrapolate : bool, optional If True, extend end slopes as constant-endpoint extrapolation through :func:`numpy.interp`. Defaults to False. missing_value : scalar, optional Optional finite missing-value marker in addition to NaN. output_dim : str, optional Name of the output isentropic-level dimension when ``theta_levels`` is not an xarray object. Defaults to ``"theta"``. Returns ------- :class:`xarray.DataArray` ``data`` interpolated to ``theta_levels`` with metadata preserved. """ if not all(isinstance(x, xr.DataArray) for x in (data, temperature, pressure)): raise TypeError( "data, temperature, and pressure must be xarray DataArray objects" ) if lev_dim is None: candidates = ("lev", "level", "hybrid", "plev", "isobaricInhPa") lev_dim = next((dim for dim in candidates if dim in data.dims), None) if lev_dim is None: raise ValueError( "Unable to determine vertical dimension name. Please specify " "the name via `lev_dim` argument." ) if lev_dim not in data.dims: raise ValueError(f"`lev_dim` {lev_dim!r} is not a dimension of data") if lev_dim not in temperature.dims: raise ValueError(f"`lev_dim` {lev_dim!r} is not a dimension of temperature") if lev_dim not in pressure.dims: raise ValueError(f"`lev_dim` {lev_dim!r} is not a dimension of pressure") theta_coord = _target_level_dataarray( theta_levels, default_dim=output_dim, units="K", long_name="potential temperature", ) theta_dim = theta_coord.dims[0] target_theta = np.asarray(theta_coord.data, dtype=np.float64) if not np.isfinite(target_theta).all(): raise ValueError("theta_levels must be finite") _require_compiled_interp( "interp_to_isentropic", _dinterp_to_isentropic_corder_into, ) _reject_lazy_or_unit_backed_inputs( "interp_to_isentropic", data, temperature, pressure ) temperature, pressure = xr.align(temperature, pressure, join="exact") data, temperature = xr.align(data, temperature, join="exact") spvl = float(missing_value) if not np.isnan(missing_value) else float(_SPVL) result = _interp_to_isentropic( data, temperature, pressure, target_theta, lev_dim=lev_dim, p0=p0, kappa=kappa, spvl=spvl, extrapolate=extrapolate, ) if result.dims[data.dims.index(lev_dim)] != theta_dim: result = result.rename({"theta": theta_dim}) result = result.assign_coords({theta_dim: theta_coord}) result[theta_dim].attrs.update(theta_coord.attrs) return result
[docs] def interp_sigma_to_hybrid( data: xr.DataArray, sig_coords: xr.DataArray, ps: xr.DataArray, hyam: xr.DataArray, hybm: xr.DataArray, p0: float = 100000.0, lev_dim: str | None = None, method: str = "linear", ) -> xr.DataArray: """Interpolate data from sigma to hybrid coordinates. This function interpolates atmospheric data from sigma coordinate levels to hybrid-sigma coordinate levels. Preserves all metadata from the input data. Parameters ---------- data : :class:`xarray.DataArray` Multidimensional data array with sigma levels and a ``lev_dim`` coordinate. sig_coords : :class:`xarray.DataArray` A one-dimensional array of sigma coordinates corresponding to the ``lev_dim`` dimension of ``data``. ps : :class:`xarray.DataArray` A multi-dimensional array of surface pressures (Pa), same time/space shape as data. hyam, hybm : :class:`xarray.DataArray` One-dimensional arrays containing the hybrid A and B coefficients. Must have the same dimension as the desired output hybrid levels. p0 : float, optional Scalar numeric value equal to surface reference pressure (Pa). Defaults to 100000 Pa. lev_dim : str, optional String that is the name of level dimension in data. If None, attempts to detect automatically using CF conventions. method : str, optional Interpolation method passed to the compiled sigma2hybrid kernels. Supported values are ``"linear"``, ``"log"``, and ``"log-log"`` (or ``"loglog"``). Defaults to ``"linear"``. Returns ------- output : :class:`xarray.DataArray` Interpolated data with hybrid levels as the new vertical coordinate Examples -------- Basic interpolation from sigma to hybrid coordinates: >>> import skyborn.interp as si >>> import xarray as xr >>> import numpy as np >>> >>> # Interpolate data from sigma to hybrid levels >>> data_hybrid = si.interp_sigma_to_hybrid( ... data=sigma_data, ... sig_coords=sigma_levels, ... ps=surface_pressure, ... hyam=hybrid_a, ... hybm=hybrid_b ... ) See Also -------- interp_hybrid_to_pressure : Interpolate from hybrid to pressure coordinates Related NCL Function: `sigma2hybrid <https://www.ncl.ucar.edu/Document/Functions/Built-in/sigma2hybrid.shtml>`__ """ _require_compiled_interp( "interp_sigma_to_hybrid", _dsigma2hybrid_nodes, _dsigma2hybrid_nodes_into, _dsigma2hybrid_nodes_corder_into, ) _reject_lazy_or_unit_backed_inputs( "interp_sigma_to_hybrid", data, sig_coords, ps, hyam, hybm ) # Determine the level dimension and then the interpolation axis if lev_dim is None: try: lev_dim = data.cf["vertical"].name except Exception: raise ValueError( "Unable to determine vertical dimension name. Please specify the name via `lev_dim` argument.'" ) method = _normalize_interp_method(method) return _interp_sigma_to_hybrid( data=data, sig_coords=sig_coords, ps=ps, hyam=hyam, hybm=hybm, p0=p0, lev_dim=lev_dim, method=method, )
[docs] def interp_multidim( data_in: supported_types, lat_out: coordinate_types, lon_out: coordinate_types, lat_in: coordinate_types | None = None, lon_in: coordinate_types | None = None, cyclic: bool = False, missing_val: np.number | None = None, method: str = "linear", fill_value: str | np.number = np.nan, ) -> supported_types: """Multidimensional interpolation of variables. Uses ``xarray.interp`` to perform interpolation. Will not perform extrapolation by default, returns missing values if any surrounding points contain missing values. .. warning:: The output data type may be promoted to that of the coordinate data. Parameters ---------- data_in : :class:`xarray.DataArray`, ndarray Data array with data to be interpolated and associated coords. If it is a np array, then ``lat_in`` and ``lon_in`` must be provided. Length must be coordinated with given coordinates. lat_out: ndarray List of latitude coordinates to be interpolated to. lon_out: ndarray List of longitude coordinates to be interpolated to. lat_in: ndarray List of latitude coordinates corresponding to ``data_in``. Must be given if ``data_in`` is not an xarray. lon_in: ndarray List of longitude coordinates corresponding to ``data_in``. Must be given if ``data_in`` is not an xarray. cyclic: bool, optional Set as true if lon values are cyclical but do not fully wrap around the globe (0, 1.5, 3, ..., 354, 355.5) Default is false missing_val : :class:`np.number`, optional Provide a number to represent missing data. Alternative to using ``np.nan`` method: str, optional Provide specific method of interpolation. Default is "linear" “linear” or “nearest” for multidimensional array fill_value: str, optional Set as 'extrapolate' to allow extrapolation of data. Default is no extrapolation. Returns ------- data_out : ndarray, :class:`xarray.DataArray` Returns the same type of object as input ``data_in``. However, the type of the data in the array may be promoted to that of the coordinates. Shape will be the same as input array except for last two dimensions which will be equal to the coordinates given in ``data_out``. Examples -------- Basic multidimensional interpolation: >>> import skyborn.interp as si >>> import xarray as xr >>> import numpy as np >>> >>> # Create sample data >>> data = np.asarray([[1, 2, 3, 4, 5, 99], ... [2, 4, 6, 8, 10, 12]]) >>> lat_in = [0, 1] >>> lon_in = [0, 50, 100, 250, 300, 350] >>> data_in = xr.DataArray(data, ... dims=['lat', 'lon'], ... coords={'lat':lat_in, ... 'lon': lon_in}) >>> >>> # Interpolate to new coordinates with cyclic boundary >>> do = si.interp_multidim(data_in, ... [0, 1], ... [0, 50, 360], ... cyclic=True, ... missing_val=99) >>> print(do) <xarray.DataArray (lat: 2, lon: 3)> array([[ 1., 2., 99.], [ 2., 4., 99.]]) Coordinates: * lat (lat) int64 0 1 * lon (lon) int64 0 50 360 See Also -------- Related External Functions: `xarray.DataArray.interp <https://docs.xarray.dev/en/stable/generated/xarray.DataArray.interp.html>`__, `cartopy.util.add_cyclic_point <https://scitools.org.uk/cartopy/docs/latest/reference/generated/cartopy.util.add_cyclic_point.html>`__ Related NCL Function: `NCL linint2 <https://www.ncl.ucar.edu/Document/Functions/Built-in/linint2.shtml>`__ """ # check for xarray/numpy if not isinstance(data_in, xr.DataArray): if lat_in is None or lon_in is None: raise ValueError( "Argument lat_in and lon_in must be provided if data_in is not an xarray" ) data_in = xr.DataArray( data_in, dims=["lat", "lon"], coords={"lat": lat_in, "lon": lon_in} ) output_coords = { data_in.dims[-1]: lon_out, data_in.dims[-2]: lat_out, } data_in_modified = _pre_interp_multidim(data_in, cyclic, missing_val) data_out = data_in_modified.interp( output_coords, method=method, kwargs={"fill_value": fill_value} ) data_out_modified = _post_interp_multidim(data_out, missing_val=missing_val) return data_out_modified