Source code for skyborn.windspharm.standard

"""
Spherical harmonic vector wind computations.

This module provides the main VectorWind class for analyzing atmospheric wind fields
using spherical harmonic transforms. It enables calculation of vorticity, divergence,
streamfunction, velocity potential, and other dynamical quantities.
"""

from __future__ import annotations

from typing import Literal, Optional, Tuple, Union

import numpy as np

from skyborn.spharm import Spharmt, gaussian_lats_wts
from skyborn.spharm import spherical_harmonics as spharm_mod

__all__ = ["VectorWind"]

# Type aliases for better readability
ArrayLike = Union[np.ndarray, np.ma.MaskedArray]
GridType = Literal["regular", "gaussian"]
LegFunc = Literal["stored", "computed"]


[docs] class VectorWind: """ Vector wind analysis using spherical harmonic transforms. This class provides methods for analyzing atmospheric wind fields through spherical harmonic decomposition. It can compute vorticity, divergence, streamfunction, velocity potential, and perform Helmholtz decomposition. Parameters ---------- u : array_like Zonal wind component. Shape should be (nlat, nlon) or (nlat, nlon, *extra_dims), where nlat is latitude points and nlon is longitude points. Latitude dimension should be north-to-south. This follows the underlying SPHEREPACK convention that the first latitude row is the northernmost point and latitude indices increase southward. v : array_like Meridional wind component. Must have same shape as u. gridtype : {'regular', 'gaussian'}, default 'regular' Type of input grid: - 'regular': evenly-spaced lat/lon grid - 'gaussian': Gaussian latitude grid rsphere : float, default 6.3712e6 Earth radius in meters for spherical harmonic computations. legfunc : {'stored', 'computed'}, default 'stored' Legendre function computation method: - 'stored': precompute and store (faster, more memory) - 'computed': compute on-the-fly (slower, less memory) precision : {'auto', 'single', 'double'}, default 'auto' Public output precision mode. 'auto' preserves the promoted input floating precision, 'single' returns float32/complex64 outputs, and 'double' returns float64/complex128 outputs. Attributes ---------- u : ndarray Zonal wind component v : ndarray Meridional wind component gridtype : str Grid type used rsphere : float Sphere radius in meters used by the underlying transform legfunc : str Legendre function handling mode used by the underlying transform precision : str Public output precision mode s : Spharmt Spherical harmonic transform object Examples -------- >>> import numpy as np >>> from skyborn.windspharm import VectorWind >>> >>> # Create sample wind field >>> nlat, nlon = 73, 144 >>> u = np.random.randn(nlat, nlon) >>> v = np.random.randn(nlat, nlon) >>> >>> # Initialize VectorWind >>> vw = VectorWind(u, v, gridtype='gaussian') >>> >>> # Calculate dynamical quantities >>> vorticity = vw.vorticity() >>> divergence = vw.divergence() >>> streamfunction = vw.streamfunction() >>> velocity_potential = vw.velocitypotential() Notes ----- The NumPy ``standard.VectorWind`` interface does not automatically reorder latitude. If your data is south-to-north, reverse it first or use :func:`skyborn.windspharm.tools.order_latdim`. The xarray interface handles this reordering automatically. """
[docs] def __init__( self, u: ArrayLike, v: ArrayLike, gridtype: GridType = "regular", rsphere: float = 6.3712e6, legfunc: LegFunc = "stored", precision: Literal["auto", "single", "double"] = "auto", ) -> None: """ Initialize VectorWind instance with comprehensive data validation. This method performs thorough validation of input wind components including checks for missing values, infinite values, shape compatibility, and dimensional requirements. Args: u: Zonal wind component with latitude on axis 0 and longitude on axis 1. v: Meridional wind component with the same shape as ``u``. gridtype: Grid type - ``"regular"`` or ``"gaussian"``. rsphere: Sphere radius in meters. legfunc: Legendre function handling mode - ``"stored"`` or ``"computed"``. precision: Public output precision mode. ``"auto"`` preserves the promoted input floating precision, ``"single"`` returns float32 outputs, and ``"double"`` returns float64 outputs. Raises: ValueError: If the input arrays fail shape, dimensional, or finite value validation. """ # Step 1: Handle masked arrays and create copies self._output_dtype = self._infer_output_dtype(u, v) self._precision = Spharmt._validate_precision(precision) self.u = self._process_input_array(u, "u") self.v = self._process_input_array(v, "v") # Step 2: Comprehensive data validation self._validate_input_data() # Step 3: Validate dimensions and shapes self._validate_dimensions() # Step 4: Initialize spherical harmonic transform nlat, nlon = self.u.shape[0], self.u.shape[1] self._initialize_spharmt(nlon, nlat, gridtype, rsphere, legfunc, precision) self._setup_cached_arrays() # Step 5: Create method aliases for backward compatibility self._setup_method_aliases()
@classmethod def _from_owned_input( cls, u: ArrayLike, v: ArrayLike, gridtype: GridType = "regular", rsphere: float = 6.3712e6, legfunc: LegFunc = "stored", precision: Literal["auto", "single", "double"] = "auto", ) -> "VectorWind": """Create an instance from validated wrapper-owned arrays.""" self = cls.__new__(cls) self._output_dtype = self._infer_output_dtype(u, v) self._precision = Spharmt._validate_precision(precision) self.u = self._process_input_array(u, "u", copy=False) self.v = self._process_input_array(v, "v", copy=False) self._validate_input_data() self._validate_dimensions() nlat, nlon = self.u.shape[0], self.u.shape[1] self._initialize_spharmt(nlon, nlat, gridtype, rsphere, legfunc, precision) self._setup_cached_arrays() self._setup_method_aliases() return self # ------------------------------------------------------------------ # Input processing, dtype handling, and validation # ------------------------------------------------------------------ @staticmethod def _infer_output_dtype(*arrays: ArrayLike) -> np.dtype: """Return the public output dtype for floating inputs.""" output_dtype = None for arr in arrays: dtype = np.dtype(np.asarray(arr).dtype) if np.issubdtype(dtype, np.floating): public_dtype = dtype else: public_dtype = np.dtype(np.float64) output_dtype = ( public_dtype if output_dtype is None else np.result_type(output_dtype, public_dtype) ) return np.dtype(np.float64 if output_dtype is None else output_dtype) def _restore_output_dtype( self, data: np.ndarray, output_dtype: Optional[np.dtype] = None ) -> np.ndarray: """Cast grid-space public outputs back to the input precision.""" array = np.asarray(data) precision = getattr(self, "_precision", "auto") if precision == "double": target_dtype = np.float64 elif precision == "single": target_dtype = np.float32 else: target_dtype = ( self._output_dtype if output_dtype is None else np.dtype(output_dtype) ) if array.dtype == target_dtype: return array return array.astype(target_dtype, copy=False) def _process_input_array( self, arr: ArrayLike, name: str, copy: bool = True ) -> np.ndarray: """ Process input array handling masked arrays and creating copies. Parameters ---------- arr : array_like Input array (u or v wind component) name : str Name of the array ('u' or 'v') for error messages Returns ------- ndarray Processed numpy array """ try: # Handle masked arrays if hasattr(arr, "filled"): masked = np.ma.asarray(arr) dtype = masked.dtype if not np.issubdtype(dtype, np.floating): dtype = np.dtype(np.float64) processed = np.ma.asarray(masked, dtype=dtype).filled(fill_value=np.nan) if copy: processed = processed.copy() else: raw = np.asarray(arr) dtype = np.dtype(raw.dtype) if not np.issubdtype(dtype, np.floating): dtype = np.dtype(np.float64) processed = np.asarray(raw, dtype=dtype) if copy: processed = processed.copy() except (ValueError, TypeError) as e: raise ValueError(f"Cannot convert {name} to numpy array: {e}") return processed def _validate_input_data(self) -> None: """ Comprehensive validation of input wind data. Checks for: - Missing values (NaN) - Infinite values (±inf) - Shape compatibility - Data type compatibility """ if not np.isfinite(self.u).all() or not np.isfinite(self.v).all(): nan_locations, inf_locations = self._invalid_value_locations() if nan_locations: raise ValueError( f"Input wind components contain missing values (NaN). " f"Found in {', '.join(nan_locations)}. " f"Please ensure all wind data is finite and valid." ) raise ValueError( f"Input wind components contain infinite values. " f"Found in {', '.join(inf_locations)}. " f"Please ensure all wind data is finite." ) # Check for extremely large values that might cause numerical issues # u_max = np.abs(self.u).max() # v_max = np.abs(self.v).max() # max_reasonable = 1e6 # 1000 km/s should be more than enough for any wind # if u_max > max_reasonable or v_max > max_reasonable: # raise ValueError( # f"Input wind components contain extremely large values " # f"(max |u|={u_max:.2e}, max |v|={v_max:.2e}). " # f"Please check units and data validity." # ) def _invalid_value_locations(self) -> Tuple[list[str], list[str]]: """Count NaN and infinite values after the fast finite check fails.""" nan_locations: list[str] = [] inf_locations: list[str] = [] for name, values in (("u", self.u), ("v", self.v)): invalid = ~np.isfinite(values) invalid_count = int(invalid.sum()) if not invalid_count: continue invalid_values = values[invalid] nan_count = int(np.isnan(invalid_values).sum()) inf_count = invalid_count - nan_count if nan_count: nan_locations.append(f"{name}: {nan_count} NaN values") if inf_count: inf_locations.append(f"{name}: {inf_count} infinite values") return nan_locations, inf_locations def _validate_dimensions(self) -> None: """ Validate array dimensions and shapes. """ # Check shape compatibility if self.u.shape != self.v.shape: raise ValueError( f"Wind components must have identical shapes. " f"Got u: {self.u.shape}, v: {self.v.shape}" ) # Check dimensionality if self.u.ndim < 2: raise ValueError( f"Wind components must be at least 2D arrays. " f"Got {self.u.ndim}D arrays with shape {self.u.shape}. " f"Expected shape: (nlat, nlon) or (nlat, nlon, *extra_dims)" ) # Check minimum grid size nlat, nlon = self.u.shape[:2] if nlat < 3 or nlon < 4: raise ValueError( f"Grid too small for spherical harmonic analysis. " f"Got (nlat={nlat}, nlon={nlon}). " f"Minimum requirements: nlat ≥ 3, nlon ≥ 4" ) # Check for reasonable grid sizes # if nlat > 10000 or nlon > 10000: # import warnings # warnings.warn( # f"Very large grid detected (nlat={nlat}, nlon={nlon}). " # f"This may require significant memory and computation time.", # UserWarning, # ) # ------------------------------------------------------------------ # Transform setup and small per-instance caches # ------------------------------------------------------------------ def _initialize_spharmt( self, nlon: int, nlat: int, gridtype: str, rsphere: float, legfunc: str, precision: Literal["auto", "single", "double"] = "auto", ) -> None: """ Initialize spherical harmonic transform object with validation. Parameters ---------- nlon, nlat : int Grid dimensions gridtype : str Grid type rsphere : float Sphere radius legfunc : str Legendre function computation method """ # Validate gridtype gridtype_lower = gridtype.lower() if gridtype_lower not in ("regular", "gaussian"): raise ValueError( f"Invalid grid type: '{gridtype}'. " f"Must be 'regular' or 'gaussian'" ) # Validate rsphere if not isinstance(rsphere, (int, float)) or rsphere <= 0: raise ValueError( f"Invalid sphere radius: {rsphere}. " f"Must be a positive number (in meters)" ) # Validate legfunc if legfunc not in ("stored", "computed"): raise ValueError( f"Invalid legfunc option: '{legfunc}'. " f"Must be 'stored' or 'computed'" ) # Store configuration self.gridtype = gridtype_lower self.rsphere = rsphere self.legfunc = legfunc try: # Create spherical harmonic transform object self.s = Spharmt( nlon=nlon, nlat=nlat, gridtype=self.gridtype, rsphere=rsphere, legfunc=legfunc, precision=precision, ) except Exception as e: # Provide more informative error messages if "invalid input dimensions" in str(e).lower(): raise ValueError( f"Invalid grid dimensions for {gridtype_lower} grid: " f"(nlat={nlat}, nlon={nlon}). " f"Please check grid size requirements for spherical harmonics." ) from e elif "spharm" in str(e).lower(): raise ValueError( f"Spherical harmonic initialization failed: {e}. " f"Please ensure spharm module is properly compiled." ) from e else: raise ValueError( f"Failed to initialize spherical harmonic transform: {e}" ) from e def _setup_method_aliases(self) -> None: """Set up method aliases for backward compatibility.""" self.rotationalcomponent = self.nondivergentcomponent self.divergentcomponent = self.irrotationalcomponent def _setup_cached_arrays(self) -> None: """Initialize small per-instance caches for repeated wrapper work.""" self._latitude_cache: dict[tuple[str, int], np.ndarray] = {} self._coriolis_cache: dict[tuple[str, int, float, str], np.ndarray] = {} self._planetary_vorticity_spec_cache: dict[ tuple[str, int, int, float, int], np.ndarray ] = {} self._vector_analysis_layout_cache: Optional[ Tuple[np.ndarray, np.ndarray, Tuple[int, ...]] ] = None def _latitude_values(self) -> np.ndarray: """Return latitude coordinates for this wind grid.""" key = (self.gridtype, self.s.nlat) cache = getattr(self, "_latitude_cache", None) if cache is None: cache = {} self._latitude_cache = cache lat = cache.get(key) if lat is not None: return lat nlat = self.s.nlat if self.gridtype == "gaussian": lat, _ = gaussian_lats_wts(nlat) elif nlat % 2: lat = np.linspace(90, -90, nlat) else: dlat = 180.0 / nlat lat = np.arange(90 - dlat / 2.0, -90, -dlat) cache[key] = np.asarray(lat) return cache[key] def _coriolis_values( self, omega: Optional[float], dtype: Optional[np.dtype] = None ) -> np.ndarray: """Return cached latitude-only Coriolis values.""" if omega is None: omega = 7.292e-05 try: omega_value = float(omega) except (TypeError, ValueError): raise ValueError(f"invalid value for omega: {omega!r}") output_dtype = self._output_dtype if dtype is None else np.dtype(dtype) key = (self.gridtype, self.s.nlat, omega_value, output_dtype.str) cache = getattr(self, "_coriolis_cache", None) if cache is None: cache = {} self._coriolis_cache = cache coriolis = cache.get(key) if coriolis is not None: return coriolis coriolis = 2.0 * omega_value * np.sin(np.deg2rad(self._latitude_values())) coriolis = coriolis.astype(output_dtype, copy=False) cache[key] = coriolis return coriolis # ------------------------------------------------------------------ # Internal numerical helpers. These avoid public dtype restoration # between intermediate steps in chained diagnostics. # ------------------------------------------------------------------ def _planetary_vorticity( self, omega: Optional[float] = None, materialize: bool = True, dtype: Optional[np.dtype] = None, ) -> np.ndarray: """Return planetary vorticity before public dtype restore.""" coriolis = self._coriolis_values(omega, dtype=dtype) return self._broadcast_planetaryvorticity(coriolis, materialize=materialize) def _broadcast_planetaryvorticity( self, coriolis: np.ndarray, materialize: bool ) -> np.ndarray: """Broadcast latitude-only Coriolis values to the wind field shape.""" indices = [slice(None)] + [np.newaxis] * (len(self.u.shape) - 1) broadcast = np.broadcast_to(coriolis[tuple(indices)], self.u.shape) if materialize: return np.array(broadcast, copy=True) return broadcast def _planetary_vorticity_spec( self, truncation: Optional[int] = None, omega: Optional[float] = None, spectral_ndim: int = 0, ) -> np.ndarray: """Return cached planetary-vorticity spectrum, broadcast for reuse.""" try: omega_value = 7.292e-05 if omega is None else float(omega) except (TypeError, ValueError): raise ValueError(f"invalid value for omega: {omega!r}") ntrunc = self.s._validate_ntrunc(truncation, self.s.nlat - 1) key = (self.gridtype, self.s.nlat, self.s.nlon, omega_value, ntrunc) cache = getattr(self, "_planetary_vorticity_spec_cache", None) if cache is None: cache = {} self._planetary_vorticity_spec_cache = cache fspec = cache.get(key) if fspec is None: coriolis = self._coriolis_values(omega_value, dtype=self._output_dtype) grid = np.broadcast_to(coriolis[:, np.newaxis], (self.s.nlat, self.s.nlon)) fspec = self.s.grdtospec(grid, ntrunc=ntrunc) cache[key] = fspec if spectral_ndim <= 0: return fspec return fspec[(slice(None),) + (np.newaxis,) * spectral_ndim] def _gradient_components( self, chi: ArrayLike, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """Return vector gradient before public dtype restore.""" try: chi = chi.filled(fill_value=np.nan) if hasattr(chi, "filled") else chi except AttributeError: pass if np.any(np.isnan(chi)): raise ValueError("chi cannot contain missing values") try: chi_spec = self.s.grdtospec(chi, ntrunc=truncation) except ValueError: raise ValueError("input field is not compatible") return self.s.getgrad(chi_spec) def _vector_analysis_layout_inputs( self, ) -> Tuple[np.ndarray, np.ndarray, Tuple[int, ...]]: """Return one cached Fortran-ready flattened wind-field pair.""" cache = getattr(self, "_vector_analysis_layout_cache", None) if cache is not None: return cache if self.u.ndim == 2: extra_shape: Tuple[int, ...] = () layout_u = np.asfortranarray(np.expand_dims(self.u, 2), dtype=np.float32) layout_v = np.asfortranarray(np.expand_dims(self.v, 2), dtype=np.float32) else: extra_shape = tuple(self.u.shape[2:]) nt = int(np.prod(extra_shape, dtype=int)) # Preserve the existing C-order flatten semantics, then convert once # to a Fortran-contiguous 3D layout for repeated vector analysis. layout_u = np.asfortranarray( self.u.reshape(self.u.shape[0], self.u.shape[1], nt), dtype=np.float32, ) layout_v = np.asfortranarray( self.v.reshape(self.v.shape[0], self.v.shape[1], nt), dtype=np.float32, ) cache = (layout_u, layout_v, extra_shape) self._vector_analysis_layout_cache = cache return cache def _vector_analysis_spectral( self, truncation: Optional[int] = None, component: Optional[str] = None ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """Return cached-layout vector-analysis spectra for wind-derived diagnostics.""" if component is None and not hasattr( self.s, "_analyze_vector_harmonics_flattened" ): return self.s.getvrtdivspec(self.u, self.v, ntrunc=truncation) if component == "vrt" and not hasattr( self.s, "_analyze_vector_harmonic_component_flattened" ): return self.s.getvrtspec(self.u, self.v, ntrunc=truncation) if component == "div" and not hasattr( self.s, "_analyze_vector_harmonic_component_flattened" ): return self.s.getdivspec(self.u, self.v, ntrunc=truncation) layout_u, layout_v, extra_shape = self._vector_analysis_layout_inputs() if component is None: br, bi, cr, ci, _, ntrunc = self.s._analyze_vector_harmonics_flattened( layout_u, layout_v, extra_shape, truncation, "vector wind analysis", ) vrtspec, divspec = spharm_mod._spherepack.twodtooned_vrtdiv( br, bi, cr, ci, ntrunc, self.s.rsphere ) return ( self.s._restore_spectral_shape(vrtspec, extra_shape), self.s._restore_spectral_shape(divspec, extra_shape), ) coeff_r, coeff_i, _, ntrunc = ( self.s._analyze_vector_harmonic_component_flattened( layout_u, layout_v, extra_shape, truncation, "vector wind analysis", component, ) ) if component == "vrt": dataspec = spharm_mod._spherepack.twodtooned_vrt( coeff_r, coeff_i, ntrunc, self.s.rsphere ) elif component == "div": dataspec = spharm_mod._spherepack.twodtooned_div( coeff_r, coeff_i, ntrunc, self.s.rsphere ) else: raise ValueError(f"unsupported component: {component!r}") return self.s._restore_spectral_shape(dataspec, extra_shape) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @property def grid_info(self) -> dict: """ Get information about the grid configuration. Returns ------- dict Dictionary containing grid information """ nlat, nlon = self.u.shape[:2] return { "gridtype": self.gridtype, "nlat": nlat, "nlon": nlon, "shape": self.u.shape, "rsphere": self.rsphere, "legfunc": self.legfunc, "total_points": nlat * nlon, }
[docs] def magnitude(self) -> np.ndarray: """ Calculate wind speed (magnitude of vector wind). Returns ------- ndarray Wind speed field computed as sqrt(u² + v²) Examples -------- >>> wind_speed = vw.magnitude() """ return self._restore_output_dtype(np.hypot(self.u, self.v))
[docs] def vrtdiv(self, truncation: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]: """ Calculate relative vorticity and horizontal divergence. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. If None, uses the default truncation based on grid resolution. Returns ------- vorticity : ndarray Relative vorticity field divergence : ndarray Horizontal divergence field See Also -------- vorticity : Calculate only vorticity divergence : Calculate only divergence Examples -------- >>> vrt, div = vw.vrtdiv() >>> vrt_t13, div_t13 = vw.vrtdiv(truncation=13) """ vrtspec, divspec = self._vector_analysis_spectral(truncation=truncation) vrt, div = self.s._spectogrd_pair(vrtspec, divspec) return ( self._restore_output_dtype(vrt), self._restore_output_dtype(div), )
[docs] def vorticity(self, truncation: Optional[int] = None) -> np.ndarray: """ Calculate relative vorticity. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- ndarray Relative vorticity field (∇ × v) See Also -------- vrtdiv : Calculate both vorticity and divergence absolutevorticity : Calculate absolute vorticity Examples -------- >>> vrt = vw.vorticity() >>> vrt_t13 = vw.vorticity(truncation=13) """ return self._restore_output_dtype( self.s.spectogrd( self._vector_analysis_spectral(truncation=truncation, component="vrt") ) )
[docs] def divergence(self, truncation: Optional[int] = None) -> np.ndarray: """ Calculate horizontal divergence. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- ndarray Horizontal divergence field (∇ · v) See Also -------- vrtdiv : Calculate both vorticity and divergence Examples -------- >>> div = vw.divergence() >>> div_t13 = vw.divergence(truncation=13) """ return self._restore_output_dtype( self.s.spectogrd( self._vector_analysis_spectral(truncation=truncation, component="div") ) )
[docs] def planetaryvorticity(self, omega: Optional[float] = None) -> np.ndarray: """ Calculate planetary vorticity (Coriolis parameter). Parameters ---------- omega : float, optional Earth's angular velocity in rad/s. Default is 7.292e-5 s⁻¹. Returns ------- ndarray Planetary vorticity field (2Ω sin φ) See Also -------- absolutevorticity : Calculate absolute vorticity Examples -------- >>> f = vw.planetaryvorticity() >>> f_custom = vw.planetaryvorticity(omega=7.2921150e-5) """ return self._planetary_vorticity( omega=omega, materialize=True, dtype=self._output_dtype )
[docs] def absolutevorticity( self, omega: Optional[float] = None, truncation: Optional[int] = None ) -> np.ndarray: """ Calculate absolute vorticity (relative + planetary vorticity). Parameters ---------- omega : float, optional Earth's angular velocity in rad/s. Default is 7.292e-5 s⁻¹. truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- ndarray Absolute vorticity field (ζ + f) See Also -------- vorticity : Calculate relative vorticity planetaryvorticity : Calculate planetary vorticity Examples -------- >>> abs_vrt = vw.absolutevorticity() >>> abs_vrt_t13 = vw.absolutevorticity(omega=7.2921150e-5, truncation=13) """ vrt = self.s.spectogrd( self._vector_analysis_spectral(truncation=truncation, component="vrt") ) return self._restore_output_dtype( vrt + self._planetary_vorticity(omega=omega, materialize=False) )
[docs] def sfvp(self, truncation: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]: """ Calculate streamfunction and velocity potential. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- streamfunction : ndarray Streamfunction field (ψ) velocity_potential : ndarray Velocity potential field (χ) See Also -------- streamfunction : Calculate only streamfunction velocitypotential : Calculate only velocity potential Examples -------- >>> psi, chi = vw.sfvp() >>> psi_t13, chi_t13 = vw.sfvp(truncation=13) """ vrtspec, divspec = self._vector_analysis_spectral(truncation=truncation) psi, chi = self.s._spectogrd_pair( self.s._invlapspec(vrtspec, "sfvp"), self.s._invlapspec(divspec, "sfvp"), ) return self._restore_output_dtype(psi), self._restore_output_dtype(chi)
[docs] def streamfunction(self, truncation: Optional[int] = None) -> np.ndarray: """ Calculate streamfunction. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- ndarray Streamfunction field (ψ) See Also -------- sfvp : Calculate both streamfunction and velocity potential Examples -------- >>> psi = vw.streamfunction() >>> psi_t13 = vw.streamfunction(truncation=13) """ psi = self.s.spectogrd( self.s._invlapspec( self._vector_analysis_spectral(truncation=truncation, component="vrt"), "streamfunction", ) ) return self._restore_output_dtype(psi)
[docs] def velocitypotential(self, truncation: Optional[int] = None) -> np.ndarray: """ Calculate velocity potential. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- ndarray Velocity potential field (χ) See Also -------- sfvp : Calculate both streamfunction and velocity potential Examples -------- >>> chi = vw.velocitypotential() >>> chi_t13 = vw.velocitypotential(truncation=13) """ chi = self.s.spectogrd( self.s._invlapspec( self._vector_analysis_spectral(truncation=truncation, component="div"), "velocitypotential", ) ) return self._restore_output_dtype(chi)
[docs] def helmholtz( self, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Perform Helmholtz decomposition of vector wind. Decomposes the wind field into irrotational (divergent) and non-divergent (rotational) components. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- u_chi : ndarray Zonal component of irrotational wind v_chi : ndarray Meridional component of irrotational wind u_psi : ndarray Zonal component of non-divergent wind v_psi : ndarray Meridional component of non-divergent wind See Also -------- irrotationalcomponent : Get only irrotational component nondivergentcomponent : Get only non-divergent component Examples -------- >>> u_chi, v_chi, u_psi, v_psi = vw.helmholtz() >>> u_chi_t13, v_chi_t13, u_psi_t13, v_psi_t13 = vw.helmholtz(truncation=13) """ vrtspec, divspec = self._vector_analysis_spectral(truncation=truncation) psispec = self.s._invlapspec(vrtspec, "helmholtz") chispec = self.s._invlapspec(divspec, "helmholtz") v_psi, u_psi = self.s.getgrad(psispec) u_chi, v_chi = self.s.getgrad(chispec) return ( self._restore_output_dtype(u_chi), self._restore_output_dtype(v_chi), self._restore_output_dtype(-u_psi), self._restore_output_dtype(v_psi), )
[docs] def irrotationalcomponent( self, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """ Calculate irrotational (divergent) component of vector wind. Note ---- If both irrotational and non-divergent components are needed, use `helmholtz()` method for efficiency. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- u_chi : ndarray Zonal component of irrotational wind v_chi : ndarray Meridional component of irrotational wind See Also -------- helmholtz : Complete Helmholtz decomposition nondivergentcomponent : Non-divergent component Examples -------- >>> u_chi, v_chi = vw.irrotationalcomponent() >>> u_chi_t13, v_chi_t13 = vw.irrotationalcomponent(truncation=13) """ u_chi, v_chi = self.s.getgrad( self.s._invlapspec( self._vector_analysis_spectral(truncation=truncation, component="div"), "irrotationalcomponent", ) ) return self._restore_output_dtype(u_chi), self._restore_output_dtype(v_chi)
[docs] def nondivergentcomponent( self, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """ Calculate non-divergent (rotational) component of vector wind. Note ---- If both non-divergent and irrotational components are needed, use `helmholtz()` method for efficiency. Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- u_psi : ndarray Zonal component of non-divergent wind v_psi : ndarray Meridional component of non-divergent wind See Also -------- helmholtz : Complete Helmholtz decomposition irrotationalcomponent : Irrotational component Examples -------- >>> u_psi, v_psi = vw.nondivergentcomponent() >>> u_psi_t13, v_psi_t13 = vw.nondivergentcomponent(truncation=13) """ psispec = self.s._invlapspec( self._vector_analysis_spectral(truncation=truncation, component="vrt"), "nondivergentcomponent", ) v_psi, u_psi = self.s.getgrad(psispec) return self._restore_output_dtype(-u_psi), self._restore_output_dtype(v_psi)
[docs] def gradient( self, chi: ArrayLike, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """ Calculate vector gradient of a scalar field on the sphere. Parameters ---------- chi : array_like Scalar field with shape (nlat, nlon) or (nlat, nlon, nfields) matching the dimensions of the VectorWind instance. truncation : int, optional Triangular truncation limit for spherical harmonic computation. Returns ------- u_chi : ndarray Zonal component of vector gradient (∂χ/∂λ) v_chi : ndarray Meridional component of vector gradient (∂χ/∂φ) Examples -------- >>> abs_vrt = vw.absolutevorticity() >>> avrt_u, avrt_v = vw.gradient(abs_vrt) >>> avrt_u_t13, avrt_v_t13 = vw.gradient(abs_vrt, truncation=13) """ output_dtype = self._infer_output_dtype(chi) u_chi, v_chi = self._gradient_components(chi, truncation=truncation) return ( self._restore_output_dtype(u_chi, output_dtype), self._restore_output_dtype(v_chi, output_dtype), )
[docs] def truncate( self, field: ArrayLike, truncation: Optional[int] = None ) -> np.ndarray: """ Apply spectral truncation to a scalar field. This is useful to represent fields consistently with the output of other VectorWind methods. Parameters ---------- field : array_like Scalar field with shape (nlat, nlon) or (nlat, nlon, nfields) matching the dimensions of the VectorWind instance. truncation : int, optional Triangular truncation limit. If None, defaults to nlat-1. Returns ------- ndarray Field with spectral truncation applied Examples -------- >>> field_trunc = vw.truncate(scalar_field) >>> field_t21 = vw.truncate(scalar_field, truncation=21) """ try: field = ( field.filled(fill_value=np.nan) if hasattr(field, "filled") else field ) except AttributeError: pass if np.any(np.isnan(field)): raise ValueError("field cannot contain missing values") output_dtype = self._infer_output_dtype(field) try: field_spec = self.s.grdtospec(field, ntrunc=truncation) except ValueError: raise ValueError("field is not compatible") return self._restore_output_dtype(self.s.spectogrd(field_spec), output_dtype)
[docs] def rossbywavesource( self, truncation: Optional[int] = None, omega: Optional[float] = None ) -> np.ndarray: """ Calculate Rossby wave source. The Rossby wave source is defined as: S = -ζₐ∇·v - v_χ·∇ζₐ where: - ζₐ is absolute vorticity (relative + planetary) - ∇·v is horizontal divergence - v_χ is the irrotational (divergent) wind component - ∇ζₐ is the gradient of absolute vorticity Parameters ---------- truncation : int, optional Triangular truncation limit for spherical harmonic computation. If None, uses the default truncation based on grid resolution. omega : float, optional Earth's angular velocity in rad/s. Default is 7.292e-5 s⁻¹. Returns ------- ndarray Rossby wave source field (s⁻²) See Also -------- absolutevorticity : Calculate absolute vorticity divergence : Calculate horizontal divergence irrotationalcomponent : Calculate irrotational wind component gradient : Calculate vector gradient Notes ----- The Rossby wave source quantifies the generation of Rossby wave activity in the atmosphere. Positive values indicate wave generation, while negative values indicate wave absorption or dissipation. The calculation involves several steps: 1. Compute absolute vorticity (relative + planetary) 2. Compute horizontal divergence 3. Compute irrotational wind components 4. Compute gradients of absolute vorticity 5. Combine terms according to the RWS formula Examples -------- >>> rws = vw.rossbywavesource() >>> rws_t21 = vw.rossbywavesource(truncation=21) >>> rws_custom_omega = vw.rossbywavesource(omega=7.2921150e-5) References ---------- Sardeshmukh, P. D., & Hoskins, B. J. (1988). The generation of global rotational flow by steady idealized tropical heating. Journal of the Atmospheric Sciences, 45(7), 1228-1251. """ # Reuse the first vector analysis for both grid-space and spectral terms. vrtspec, divspec = self._vector_analysis_spectral(truncation=truncation) vrt, div = self.s._spectogrd_pair(vrtspec, divspec) eta = vrt + self._planetary_vorticity(omega=omega, materialize=False) # Reconstruct the divergent wind from the already-available divergence # spectrum instead of triggering a second vector analysis. uchi, vchi = self.s.getgrad(self.s._invlapspec(divspec, "rossbywavesource")) # Reuse relative-vorticity spectra plus a cached planetary-vorticity # spectrum so absolute-vorticity gradients do not trigger a full-field # scalar analysis of eta on every call. etaspec = vrtspec + self._planetary_vorticity_spec( truncation=truncation, omega=omega, spectral_ndim=max(vrtspec.ndim - 1, 0), ) etax, etay = self.s.getgrad(etaspec) # Combine components to form Rossby wave source # S = -eta * div - (uchi * etax + vchi * etay) rws = -eta * div - (uchi * etax + vchi * etay) return self._restore_output_dtype(rws)