Source code for skyborn.windspharm.reduced

"""Reduced-Gaussian vector wind analysis for packed fields."""

from __future__ import annotations

from typing import Dict, Optional, Tuple, Union

import numpy as np

from skyborn.spharm import ReducedGaussianSpharmt, gaussian_lats_wts

ArrayLike = Union[np.ndarray, np.ma.MaskedArray]


[docs] class ReducedVectorWind: """ Vector wind analysis on packed reduced Gaussian grids. Parameters ---------- u, v: Packed zonal and meridional wind components with shape ``(sum(pl), ...)``. pl: Reduced Gaussian row lengths, ordered north-to-south. rsphere: Sphere radius in meters. legfunc: Legendre function mode passed to grouped Gaussian ``Spharmt`` backends. precision: Public output precision mode. ``"auto"`` preserves the promoted input floating precision, ``"single"`` returns float32 outputs, and ``"double"`` returns float64 outputs. Attributes ---------- u, v: Packed zonal and meridional wind components. pl: Reduced Gaussian row lengths ordered north-to-south. gridtype: Fixed grid type label ``"reduced_gaussian"``. rsphere: Sphere radius in meters. legfunc: Legendre function handling mode used by the underlying transform. precision: Public output precision mode. s: Underlying :class:`skyborn.spharm.ReducedGaussianSpharmt` instance. """
[docs] def __init__( self, u: ArrayLike, v: ArrayLike, pl: ArrayLike, rsphere: float = 6.3712e6, legfunc: str = "stored", precision: str = "auto", ) -> None: """ Create a reduced-Gaussian vector wind analysis wrapper. Args: u: Packed zonal wind component with shape ``(sum(pl), ...)``. v: Packed meridional wind component with the same shape as ``u``. pl: Reduced Gaussian row lengths ordered north-to-south. rsphere: Sphere radius in meters. legfunc: Legendre function handling mode - ``"stored"`` or ``"computed"``. precision: Public output precision mode. ``"auto"`` preserves the promoted input floating precision, ``"single"`` returns float32 outputs, and ``"double"`` returns float64 outputs. Raises: ValueError: If ``u``, ``v``, or ``pl`` fails validation. """ self.pl = self._validate_pl(pl) self._output_dtype = self._infer_output_dtype(u, v) self._precision = ReducedGaussianSpharmt._validate_precision(precision) self.u = self._process_input_array(u, "u") self.v = self._process_input_array(v, "v") self._validate_input_data() self._extra_shape = tuple(self.u.shape[1:]) self._nt = ( int(np.prod(self._extra_shape, dtype=int)) if self._extra_shape else 1 ) self._u_backend = self._normalize_grid_backend(self.u) self._v_backend = self._normalize_grid_backend(self.v) self.s = ReducedGaussianSpharmt( self.pl, rsphere=rsphere, legfunc=legfunc, precision=precision, ) self.gridtype = self.s.gridtype self.rsphere = self.s.rsphere self.legfunc = self.s.legfunc self._latitude_cache: Optional[np.ndarray] = None self._planetary_vorticity_cache: Dict[ Tuple[float, str, Tuple[int, ...]], np.ndarray ] = {} self._planetary_vorticity_spec_cache: Dict[Tuple[float, int], np.ndarray] = {} self._spectral_cache: Dict[ Tuple[str, int], Union[np.ndarray, Tuple[np.ndarray, np.ndarray]] ] = {}
@staticmethod def _validate_pl(pl: ArrayLike) -> np.ndarray: arr = np.asarray(pl, dtype=np.int32) if arr.ndim != 1: raise ValueError(f"pl must be rank 1, got shape {arr.shape}") if arr.size < 3: raise ValueError(f"pl must contain at least 3 rows, got {arr.size}") if np.any(arr < 4): raise ValueError("each reduced Gaussian row must contain >= 4 points") return np.ascontiguousarray(arr) @staticmethod def _infer_output_dtype(*arrays: ArrayLike) -> np.dtype: output_dtype = None for arr in arrays: dtype = np.dtype(np.asarray(arr).dtype) if np.issubdtype(dtype, np.floating): public_dtype = dtype else: public_dtype = np.dtype(np.float64) output_dtype = ( public_dtype if output_dtype is None else np.result_type(output_dtype, public_dtype) ) return np.dtype(np.float64 if output_dtype is None else output_dtype) def _resolved_output_dtype( self, output_dtype: Optional[np.dtype] = None ) -> np.dtype: if self._precision == "double": return np.dtype(np.float64) if self._precision == "single": return np.dtype(np.float32) return self._output_dtype if output_dtype is None else np.dtype(output_dtype) def _restore_output_dtype( self, data: np.ndarray, output_dtype: Optional[np.dtype] = None ) -> np.ndarray: array = np.asarray(data) target_dtype = self._resolved_output_dtype(output_dtype) if array.dtype == target_dtype: return array return array.astype(target_dtype, copy=False) def _normalize_grid_backend(self, data: np.ndarray) -> np.ndarray: return np.asfortranarray( np.asarray(data).reshape(self.u.shape[0], self._nt), dtype=np.float32 ) def _restore_grid( self, datagrid: np.ndarray, output_dtype: Optional[np.dtype] = None ) -> np.ndarray: return self.s._restore_grid_shape( datagrid, self._extra_shape, self._resolved_output_dtype(output_dtype) ) @staticmethod def _process_input_array(arr: ArrayLike, name: str) -> np.ndarray: try: if hasattr(arr, "filled"): masked = np.ma.asarray(arr) dtype = masked.dtype if not np.issubdtype(dtype, np.floating): dtype = np.dtype(np.float64) return np.ma.asarray(masked, dtype=dtype).filled(np.nan).copy() raw = np.asarray(arr) dtype = np.dtype(raw.dtype) if not np.issubdtype(dtype, np.floating): dtype = np.dtype(np.float64) return np.asarray(raw, dtype=dtype).copy() except (TypeError, ValueError) as exc: raise ValueError(f"Cannot convert {name} to numpy array: {exc}") from exc def _validate_input_data(self) -> None: if self.u.shape != self.v.shape: raise ValueError( f"Wind components must have identical shapes. " f"Got u: {self.u.shape}, v: {self.v.shape}" ) if self.u.ndim < 1: raise ValueError( f"Packed wind components must be at least 1D, got {self.u.ndim}D" ) expected = int(self.pl.sum()) if self.u.shape[0] != expected: raise ValueError( f"Packed first dimension must be sum(pl)={expected}, " f"got {self.u.shape[0]}" ) if not np.isfinite(self.u).all() or not np.isfinite(self.v).all(): raise ValueError("Input wind components must be finite") def _validate_scalar_field(self, field: ArrayLike, name: str) -> np.ndarray: try: field = field.filled(np.nan) if hasattr(field, "filled") else field except AttributeError: pass array = np.asarray(field) if array.shape != self.u.shape: raise ValueError(f"{name} is not compatible") if not np.isfinite(array).all(): raise ValueError(f"{name} cannot contain missing or infinite values") return array @property def grid_info(self) -> dict: return { "gridtype": self.gridtype, "nlat": int(self.pl.size), "npoints": int(self.pl.sum()), "shape": self.u.shape, "rsphere": self.rsphere, "legfunc": self.legfunc, "pl": self.pl.copy(), } def _latitude_values(self) -> np.ndarray: if self._latitude_cache is None: lat, _ = gaussian_lats_wts(int(self.pl.size)) self._latitude_cache = np.repeat(lat, self.pl) return self._latitude_cache def _planetary_vorticity_values( self, omega: Optional[float] = None, dtype: Optional[np.dtype] = None, shape: Optional[Tuple[int, ...]] = None, ) -> np.ndarray: omega_value = 7.292e-05 if omega is None else float(omega) target_dtype = self._u_backend.dtype if dtype is None else np.dtype(dtype) target_shape = self.u.shape if shape is None else tuple(shape) key = (omega_value, target_dtype.str, target_shape) cached = self._planetary_vorticity_cache.get(key) if cached is not None: return cached values = np.asarray( 2.0 * omega_value * np.sin(np.deg2rad(self._latitude_values())), dtype=target_dtype, ) if len(target_shape) == 1: broadcast = np.broadcast_to(values, target_shape) result = np.array(broadcast, copy=True) else: row = values.reshape(target_shape[0], *([1] * (len(target_shape) - 1))) result = np.broadcast_to(row, target_shape) self._planetary_vorticity_cache[key] = result return result def _planetary_vorticity( self, omega: Optional[float] = None, materialize: bool = True ) -> np.ndarray: data = self._planetary_vorticity_values( omega=omega, dtype=self._output_dtype if materialize else self._u_backend.dtype, shape=self.u.shape, ) if materialize: return np.array(data, copy=True) return data def _planetary_vorticity_grid(self, omega: Optional[float] = None) -> np.ndarray: return self._planetary_vorticity_values( omega=omega, dtype=self._u_backend.dtype, shape=self._u_backend.shape, ) def _planetary_vorticity_spec( self, truncation: Optional[int] = None, omega: Optional[float] = None ) -> np.ndarray: ntrunc = self._ntrunc(truncation) omega_value = 7.292e-05 if omega is None else float(omega) key = (omega_value, ntrunc) cached = self._planetary_vorticity_spec_cache.get(key) if cached is not None: return cached dataspec = self.s._analyze_scalar(self._planetary_vorticity_grid(omega), ntrunc) self._planetary_vorticity_spec_cache[key] = dataspec return dataspec def _coriolis(self, omega: Optional[float] = None) -> np.ndarray: return self._planetary_vorticity_values(omega=omega, shape=(self.u.shape[0],)) def _ntrunc(self, truncation: Optional[int]) -> int: return self.s._validate_ntrunc(truncation) def _spectogrd_pair( self, spec_a: np.ndarray, spec_b: np.ndarray, truncation: Optional[int] = None, output_dtype: Optional[np.dtype] = None, ) -> Tuple[np.ndarray, np.ndarray]: ntrunc = self._ntrunc(truncation) grid_a, grid_b = self.s._synthesize_scalar_pair(spec_a, spec_b, ntrunc) return self._restore_grid(grid_a, output_dtype), self._restore_grid( grid_b, output_dtype ) def _spectogrd( self, spec: np.ndarray, truncation: Optional[int] = None, output_dtype: Optional[np.dtype] = None, ) -> np.ndarray: ntrunc = self._ntrunc(truncation) return self._restore_grid(self.s._synthesize_scalar(spec, ntrunc), output_dtype) def _vector_analysis_spectral( self, truncation: Optional[int] ) -> Tuple[np.ndarray, np.ndarray]: ntrunc = self._ntrunc(truncation) key = ("vrtdiv", ntrunc) cached = self._spectral_cache.get(key) if cached is not None: return cached # type: ignore[return-value] vrtspec, divspec = self.s._analyze_wind( self._u_backend, self._v_backend, ntrunc ) self._spectral_cache[key] = (vrtspec, divspec) self._spectral_cache[("vrt", ntrunc)] = vrtspec self._spectral_cache[("div", ntrunc)] = divspec return vrtspec, divspec def _vorticity_spectral(self, truncation: Optional[int]) -> np.ndarray: ntrunc = self._ntrunc(truncation) key = ("vrt", ntrunc) cached = self._spectral_cache.get(key) if cached is not None: return cached # type: ignore[return-value] full = self._spectral_cache.get(("vrtdiv", ntrunc)) if full is not None: vrtspec = full[0] # type: ignore[index] else: vrtspec = self.s._analyze_vorticity( self._u_backend, self._v_backend, ntrunc ) self._spectral_cache[key] = vrtspec return vrtspec def _divergence_spectral(self, truncation: Optional[int]) -> np.ndarray: ntrunc = self._ntrunc(truncation) key = ("div", ntrunc) cached = self._spectral_cache.get(key) if cached is not None: return cached # type: ignore[return-value] full = self._spectral_cache.get(("vrtdiv", ntrunc)) if full is not None: divspec = full[1] # type: ignore[index] else: divspec = self.s._analyze_divergence( self._u_backend, self._v_backend, ntrunc ) self._spectral_cache[key] = divspec return divspec
[docs] def magnitude(self) -> np.ndarray: """Return wind speed.""" return self._restore_output_dtype(np.hypot(self.u, self.v))
[docs] def vrtdiv(self, truncation: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]: """Return relative vorticity and horizontal divergence.""" vrtspec, divspec = self._vector_analysis_spectral(truncation) return self._spectogrd_pair(vrtspec, divspec, truncation)
[docs] def vorticity(self, truncation: Optional[int] = None) -> np.ndarray: """Return relative vorticity.""" vrtspec = self._vorticity_spectral(truncation) return self._spectogrd(vrtspec, truncation)
[docs] def divergence(self, truncation: Optional[int] = None) -> np.ndarray: """Return horizontal divergence.""" divspec = self._divergence_spectral(truncation) return self._spectogrd(divspec, truncation)
[docs] def planetaryvorticity(self, omega: Optional[float] = None) -> np.ndarray: """Return planetary vorticity on the packed reduced grid.""" return self._restore_output_dtype(self._planetary_vorticity(omega=omega))
[docs] def absolutevorticity( self, omega: Optional[float] = None, truncation: Optional[int] = None ) -> np.ndarray: """Return absolute vorticity.""" vrt = self.vorticity(truncation=truncation) return self._restore_output_dtype( vrt + self._planetary_vorticity(omega=omega, materialize=False) )
[docs] def sfvp(self, truncation: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]: """Return streamfunction and velocity potential.""" vrtspec, divspec = self._vector_analysis_spectral(truncation) return self._spectogrd_pair( self.s._invlap(vrtspec), self.s._invlap(divspec), truncation, )
[docs] def streamfunction(self, truncation: Optional[int] = None) -> np.ndarray: """Return streamfunction.""" vrtspec = self._vorticity_spectral(truncation) return self._spectogrd(self.s._invlap(vrtspec), truncation)
[docs] def velocitypotential(self, truncation: Optional[int] = None) -> np.ndarray: """Return velocity potential.""" divspec = self._divergence_spectral(truncation) return self._spectogrd(self.s._invlap(divspec), truncation)
[docs] def helmholtz( self, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Return irrotational and non-divergent wind components.""" vrtspec, divspec = self._vector_analysis_spectral(truncation) psispec = self.s._invlap(vrtspec) chispec = self.s._invlap(divspec) u_chi, v_chi, v_psi, u_psi = self.s._synthesize_gradient_pair( chispec, psispec, truncation ) return ( self._restore_grid(u_chi), self._restore_grid(v_chi), self._restore_grid(-u_psi), self._restore_grid(v_psi), )
[docs] def irrotationalcomponent( self, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """Return irrotational wind component.""" divspec = self._divergence_spectral(truncation) u_chi, v_chi = self.s._synthesize_gradient(self.s._invlap(divspec), truncation) return self._restore_grid(u_chi), self._restore_grid(v_chi)
[docs] def nondivergentcomponent( self, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """Return non-divergent wind component.""" vrtspec = self._vorticity_spectral(truncation) psispec = self.s._invlap(vrtspec) v_psi, u_psi = self.s._synthesize_gradient(psispec, truncation) return self._restore_grid(-u_psi), self._restore_grid(v_psi)
[docs] def gradient( self, chi: ArrayLike, truncation: Optional[int] = None ) -> Tuple[np.ndarray, np.ndarray]: """Return vector gradient of a packed scalar field.""" output_dtype = self._infer_output_dtype(chi) field = self._validate_scalar_field(chi, "chi") chi_spec = self.s._analyze_scalar( self._normalize_grid_backend(field), truncation ) u_chi, v_chi = self.s._synthesize_gradient(chi_spec, truncation) return ( self._restore_grid(u_chi, output_dtype), self._restore_grid(v_chi, output_dtype), )
[docs] def truncate( self, field: ArrayLike, truncation: Optional[int] = None ) -> np.ndarray: """Apply spectral truncation to a packed scalar field.""" output_dtype = self._infer_output_dtype(field) scalar = self._validate_scalar_field(field, "field") field_spec = self.s._analyze_scalar( self._normalize_grid_backend(scalar), truncation ) return self._spectogrd(field_spec, truncation, output_dtype)
[docs] def rossbywavesource( self, truncation: Optional[int] = None, omega: Optional[float] = None ) -> np.ndarray: """Return Rossby wave source on the packed reduced grid.""" vrtspec, divspec = self._vector_analysis_spectral(truncation) vrt, div = self.s._synthesize_scalar_pair(vrtspec, divspec, truncation) eta = vrt + self._planetary_vorticity_grid(omega=omega) uchi, vchi = self.s._synthesize_gradient(self.s._invlap(divspec), truncation) etaspec = vrtspec + self._planetary_vorticity_spec(truncation, omega=omega) etax, etay = self.s._synthesize_gradient(etaspec, truncation) rws = -eta * div - (uchi * etax + vchi * etay) return self._restore_grid(rws)