Source code for climopy.var

#!/usr/bin/env python3
"""
Analyses of variance and trends. Many of these are adapted
from examples and course notes provided by Professors `Elizabeth Barnes \
<http://barnes.atmos.colostate.edu/COURSES/AT655_S15/lecture_slides.html>`__
and `Dennis Hartmann \
<https://atmos.washington.edu/~dennis/552_Notes_ftp.html>`__.
"""
import numpy as np
import scipy.stats as stats
import scipy.linalg as linalg
import scipy.optimize as optimize
from .internals import quack
from .internals import docstring
from .internals.array import _ArrayContext

__all__ = [
    'autocorr',
    'autocovar',
    'corr',
    'covar',
    'eof',
    'eot',
    'reof',
    'gaussian',
    'hist',
    'linefit',
    'rednoise',
    'rednoisefit',
    'wilks',
]

# Docstring snippets
_corr_math = r"""
.. math::

    \dfrac{\sum_{i=0}^{n-k}\left(x_t - \overline{x}\right)\left(y_{t+k} - \overline{y}\right)}{(n - k) s_x s_y}

where :math:`\overline{x}` and :math:`\overline{y}` are the sample means and
:math:`s_x` and :math:`s_y` are the sample standard deviations.
"""  # noqa: E501

_covar_math = r"""
.. math::

    \dfrac{\sum_{i=0}^{n-k}\left(x_t - \overline{x}\right)\left(y_{t+k} - \overline{y}\right)}{n - k}

where :math:`\overline{x}` and :math:`\overline{y}` are the sample means.
"""  # noqa: E501

_var_data = """
z : array-like
    The input data.
"""

_covar_data = """
z1 : array-like
    The input data.
z2 : array-like, optional
    The second input data. Must be same shape as `z1`.
"""

_var_template = """
Return the {name} spectrum at successive lags.

Parameters
----------
dt : float or array-like
    The timestep or regularly-spaced time coordinates.
{data}
axis : int, optional
    Axis along which {name} is taken.
lag : int, optional
    Return {name} at the single lag `lag`.
nlag : int, optional
    Return lagged {name} from ``0`` timesteps up to `nlag` timesteps.

Returns
-------
lags : array-like
    The lags.
result : array-like
    The {name} as a function of lag.

Note
----
This function uses the following formula to estimate {name} at lag :math:`k`:
{math}
"""

docstring.snippets['autocorr'] = _var_template.format(
    data=_var_data.strip(), name='autocorrelation', math=_corr_math,
)
docstring.snippets['autocovar'] = _var_template.format(
    data=_var_data.strip(), name='autocovariance', math=_covar_math,
)
docstring.snippets['corr'] = _var_template.format(
    data=_covar_data.strip(), name='correlation', math=_corr_math,
)
docstring.snippets['covar'] = _var_template.format(
    data=_covar_data.strip(), name='covariance', math=_covar_math,
)


[docs]def gaussian(z, mean=0, stdev=None, sigma=1): """ Returns sample points on Gaussian curve. Parameters ---------- z : array-like, optional The z-statistics to be sampled. """ sigma = stdev if stdev is not None else sigma norm = stats.norm(loc=mean, scale=sigma) pdf = norm.pdf(z, loc=mean, scale=sigma) return z, pdf
[docs]def rednoise(a, ntime=100, nsamples=1, mean=0, stdev=1): r""" Return one or more artificial red noise time series with prescribed mean and standard deviation. The time series are generated with the following equation: .. math:: x(t) = a \cdot x(t - \Delta t) + b \cdot \epsilon(t) where *a* is the lag-1 autocorrelation and *b* is a scaling term. Parameters --------- a : float The autocorrelation. ntime : int, optional Number of timesteps. nsamples : int or list of int, optional Axis size or list of axis sizes for the "sample" dimension(s). Shape of the `data` array will be ``(ntime,)`` if `nsamples` is not provided, ``(ntime, nsamples)`` if `nsamples` is scalar, or ``(ntime, *nsamples)`` if `nsamples` is a list of axis sizes. mean, stdev : float, optional The mean and standard deviation for the red noise time series. Returns ------- data : array-like The red noise data. See Also -------- rednoisefit """ # Initial stuff ntime -= 1 # exclude the initial timestep nsamples = np.atleast_1d(nsamples) data = np.empty((ntime + 1, *nsamples)) # user can make N-D array b = (1 - a ** 2) ** 0.5 # from OA class # Nested loop with _ArrayContext(data, push_left=0) as context: data = context.data data[0, :] = 0.0 # initialize for i in range(data.shape[-1]): eps = np.random.normal(loc=0, scale=1, size=ntime) for t in range(1, ntime + 1): data[t, i] = a * data[t - 1, i] + b * eps[t - 1] # Return data = context.data if len(nsamples) == 1 and nsamples[0] == 1: data = data.squeeze() return mean + stdev * data # rescale to have specified stdeviation/mean
def _covar_driver( dt, z1, z2, /, *, lag=None, nlag=None, axis=0, standardize=False, verbose=False, ): """ Driver function for getting covariance. """ # Preparation, and stdev/means auto = z1 is z2 if z1.shape != z2.shape: raise ValueError(f'Incompatible shapes {z1.shape=} and {z2.shape=}.') naxis = z1.shape[axis] # length # Checks if lag is None and nlag is None: lag = 0 if lag is not None and nlag is not None: raise ValueError(f'Conflicting arguments {lag=} and {nlag=}.') if nlag is not None and nlag >= naxis / 2: raise ValueError(f'Lag {nlag} must be greater than axis length {naxis}.') if verbose: prefix = 'auto' if auto else '' suffix = 'correlation' if standardize else 'covariance' if nlag is None: print(f'Calculating lag-{lag} {prefix}{suffix}.') else: print(f'Calculating {prefix}{suffix} to lag {nlag} for axis size {naxis}.') # Means and permute z1 = np.moveaxis(z1, axis, -1) mean1 = z1.mean(axis=-1, keepdims=True) # keep dims for broadcasting if auto: z2, mean2 = z1, mean1 else: z2 = np.moveaxis(z2, axis, -1) mean2 = z2.mean(axis=-1, keepdims=True) # Standardize maybe std1 = std2 = 1 # use for covariance if standardize: std1 = z1.std(axis=-1, keepdims=True) if auto: std2 = std1 else: std2 = z2.std(axis=-1, keepdims=True) # This is just the variance, or *one* if autocorrelation mode is enabled # corrs = np.ones((*z1.shape[:-1], 1)) if nlag is None and lag == 0: lags = [0] covar = np.sum( (z1 - mean1) * (z2 - mean2), axis=-1, keepdims=True ) / (naxis * std1 * std2) # Correlation on specific lag elif nlag is None: lag = np.round(lag * dt).astype(int) lags = np.atleast_1d(lag) covar = np.sum( (z1[..., :-lag] - mean1) * (z2[..., lag:] - mean2), axis=-1, keepdims=True, ) / ((naxis - lag) * std1 * std2), # Correlation up to n timestep-lags after 0-correlation else: # First figure out lags # Negative lag means z2 leads z1 (e.g. z corr m, left-hand side is m leads z). # e.g. 20 day lag, at synoptic timesteps nlag = np.round(nlag / dt).astype(int) if not auto: n = nlag * 2 + 1 # the center point, and both sides lags = np.arange(-nlag, nlag + 1) else: n = nlag + 1 lags = np.arange(0, nlag + 1) # Get correlation # will include the zero-lag autocorrelation covar = np.empty((*z1.shape[:-1], n)) for i, lag in enumerate(lags): if lag == 0: prod = (z1 - mean1) * (z2 - mean2) elif lag < 0: # input 1 *trails* input 2 prod = (z1[..., -lag:] - mean1) * (z2[..., :lag] - mean2) else: prod = (z1[..., :-lag] - mean1) * (z2[..., lag:] - mean2) covar[..., i] = prod.sum(axis=-1) / ((naxis - lag) * std1 * std2) lags *= dt # Return lags and covariance return lags, np.moveaxis(covar, -1, axis)
[docs]@quack._xarray_covar_wrapper @quack._pint_wrapper(('=t', '=z'), ('=t', '')) @docstring.add_snippets def autocorr(dt, z, axis=0, **kwargs): """ %(autocorr)s """ # NOTE: covar checks if z1 and z2 are the same to to reduce computational cost kwargs.setdefault('standardize', True) return _covar_driver(dt, z, z, axis=axis, **kwargs)
[docs]@quack._xarray_covar_wrapper @quack._pint_wrapper(('=t', '=z'), ('=t', '=z ** 2')) @docstring.add_snippets def autocovar(dt, z, axis=0, **kwargs): """ %(autocovar)s """ # NOTE: covar checks if z1 and z2 are the same to reduce computational cost return _covar_driver(dt, z, z, axis=axis, **kwargs)
[docs]@quack._xarray_covar_wrapper @quack._pint_wrapper(('=t', '=z1', '=z2'), ('=t', '')) @docstring.add_snippets def corr(dt, z1, z2, axis=0, **kwargs): """ %(corr)s """ kwargs.setdefault('standardize', True) return _covar_driver(dt, z1, z2, axis=axis, **kwargs)
[docs]@quack._xarray_covar_wrapper @quack._pint_wrapper(('=t', '=z1', '=z2'), ('=t', '=z1 * z2')) @docstring.add_snippets def covar(dt, z1, z2, axis=0, **kwargs): """ %(covar)s """ return _covar_driver(dt, z1, z2, axis=axis, **kwargs)
[docs]@quack._xarray_eof_wrapper @quack._pint_wrapper('=z', ('', '=z', '=z ** 2', 'count')) def eof( data, /, neof=5, axis_time=-2, axis_space=-1, weights=None, percent=False, normalize=False, ): r""" Calculate the first `N` EOFs using the scipy algorithm for Hermetian matrices on the covariance matrix. Parameters ---------- data : array-like Data of arbitrary shape. neof : int, optional Number of eigenvalues we want. axis_time : int, optional Axis used as the 'record' or 'time' dimension. axis_space : int or list of int, optional Axis or axes used as 'space' dimension. weights : array-like, optional Area or mass weights; must be broadcastable on multiplication with `data` weights. Will be normalized prior to application. percent : bool, optional Whether to return raw eigenvalue(s) or the *percentage* of total variance explained by eigenvalue(s). Default is ``False``. normalize : bool, optional Whether to normalize the data by its standard deviation at every point prior to obtaining the EOFs. Returns ------- pcs : array-like The standardized principal components. The `axis_space` dimensions are reduced to length 1. projs : array-like Projections of the standardized principal components onto the original data. The `axis_time` dimension is reduced to length 1. evals If `percent` is ``Flase``, these are the eigenvalues. Otherwise, this is the percentage of total variance explained by the corresponding eigenvector. The `axis_time` and `axis_space` dimensions are reduced to length 1. nstars The approximate degrees of freedom as determined by the :cite:`2011:wilks` autocorrelation critereon. This can be used to compute the approximate 95% error bounds for the eigenvalues using the :cite:`1982:north` critereon of :math:`\lambda \sqrt{2 / N^*}`. The `axis_time` and `axis_space` dimensions are reduced to length 1. Example ------- >>> from climopy.internals.array import logger, logging ... import xarray as xr ... import climopy as climo ... data = xr.DataArray( ... np.random.rand(10, 6, 100, 40, 20), ... dims=('member', 'run', 'time', 'plev', 'lat'), ... coords={ ... 'member': np.arange(1, 11), ... 'run': np.arange(1, 7), ... 'time': np.arange(100.0), ... 'plev': np.linspace(0.0, 1000.0, 40), ... 'lat': np.linspace(-90.0, 90.0, 20), ... } ... ) ... pcs, projs, evals, nstars = climo.eof(data, axis_time=2, axis_space=(3, 4)) References ---------- .. bibliography:: ../bibs/eofs.bib """ # Parse input # WARNING: Have to explicitly 'push_left' extra dimensions (even though just # specifying nflat_left is enough to flattened the desired dimensions) or the new # 'eof' dimension is not pushed to the left of the extra dimensions. Think this # is not a bug but consistent with design. axis_space = np.atleast_1d(axis_space) np.atleast_1d(axis_time).item() # ensure time axis is 1D axis_space[axis_space < 0] += data.ndim if axis_time < 0: axis_time += data.ndim axis_extra = tuple(set(range(data.ndim)) - {axis_time, *axis_space}) # Remove the mean and optionally standardize the data data = data - data.mean(axis=axis_time, keepdims=True) # remove mean if normalize: data = data / data.std(axis=axis_time, keepdims=True) # Next apply weights ntime = data.shape[axis_time] # number timesteps nspace = np.prod(np.asarray(data.shape)[axis_space]) # number space locations if weights is None: weights = 1 weights = np.atleast_1d(weights) # want numpy array weights = weights / weights.mean() # so does not affect amplitude if ntime > nspace: dataw = data = data * np.sqrt(weights) else: dataw = data * weights # raises error if dimensions incompatible # Turn matrix in to extra (K) x time (M) x space (N) # Requires flatening space axes into one, and flattening extra axes into one shape_orig = data.shape with _ArrayContext( data, dataw, push_left=axis_extra, push_right=(axis_time, *axis_space), nflat_right=len(axis_space), # flatten space axes nflat_left=data.ndim - len(axis_space) - 1, # flatten ) as context: # Retrieve reshaped data data, dataw = context.data k = data.shape[0] # ensure 3D if data.ndim != 3 or ntime != data.shape[1] or nspace != data.shape[2]: raise RuntimeError( 'Array resizing algorithm failed. ' f'Expected (N x {ntime} x {nspace}). ' f'Turned shape from {shape_orig} to {data.shape}.' ) # Prepare output arrays pcs = np.empty((k, neof, ntime, 1)) projs = np.empty((k, neof, 1, nspace)) evals = np.empty((k, neof, 1, 1)) nstars = np.empty((k, 1, 1, 1)) # Get EOFs and PCs and stuff for i in range(k): # Get matrices x = data[i, :, :] # array will be sampling by space xw = dataw[i, :, :] # Get reduced degrees of freedom for spatial eigenvalues # TODO: Fix the weight projection below rho = np.corrcoef(x.T[:, 1:], x.T[:, :-1])[0, 1] # space x time rho_ave = (rho * weights).sum() / weights.sum() nstars[i, 0, 0, 0] = ntime * ((1 - rho_ave) / (1 + rho_ave)) # estimate # Get EOFs using covariance matrix on *shortest* dimension # NOTE: Eigenvalues are in *ascending* order, so get the last ones if x.shape[0] > x.shape[1]: # Get *temporal* covariance matrix since time dimension larger # Returns eigenvectors in columns eigrange = [nspace - neof, nspace - 1] # eigenvalues to get covar = (xw.T @ xw) / ntime l, v = linalg.eigh(covar, eigvals=eigrange, eigvals_only=False) pc = xw @ v # (time x space) x (space x neof) = (time x neof) else: # Get *spatial* dispersion matrix since space dimension longer # This time 'eigenvectors' are actually the pcs eigrange = [ntime - neof, ntime - 1] # eigenvalues to get covar = (xw @ x.T) / nspace l, pc = linalg.eigh(covar, eigvals=eigrange, eigvals_only=False) # Store in big arrays # NOTE: We store projection of PC onto data to get 1 standard # deviation associated with the PC rather than actual eigenvector, # because eigenvector values may be damped by area weighting. pc = (pc - pc.mean(axis=0)) / pc.std(axis=0) # standardize pcs proj = (x.T @ pc) / ntime # (space by time) x (time by neof) pcs[i, :, :, 0] = pc.T[::-1, :] # neof by time projs[i, :, 0, :] = proj.T[::-1, :] # neof by space if percent: # *percent explained* rather than total evals[i, :, 0, 0] = (100.0 * l[::-1] / np.trace(covar)) else: evals[i, :, 0, 0] = l[::-1] # actual eigenvalues # Replace context data with new dimension inserted on left side context.replace_data(pcs, projs, evals, nstars, insert_left=1) # Return data restored to original dimensionality return context.data
[docs]def eot(data, neof=5): # noqa """ EOTs, whatever they are. Warning ------- Not yet implemented. """ raise NotImplementedError
[docs]def reof(data, neof=5): # noqa """ Rotated EOFs, e.g. according to the "varimax" method. The EOFs will be rotated according only to the first `neof` EOFs. Warning ------- Not yet implemented. """ raise NotImplementedError
[docs]@quack._xarray_hist_wrapper @quack._pint_wrapper(('=y', '=y'), 'count') @docstring.add_snippets def hist(bins, y, /, axis=0): """ Get the histogram along axis `axis`. Parameters ---------- bins : array-like The bin location. y : array-like The data. %(hist.axis)s Example ------- >>> import climopy as climo ... import numpy as np ... import xarray as xr ... ureg = climo.ureg ... data = xr.DataArray( ... np.random.rand(20, 1000) * ureg.m, ... name='distance', ... dims=('x', 'y'), ... coords={'x': np.arange(20), 'y': np.arange(1000) * 0.1} ... ) ... bins = np.linspace(0, 1, 11) * ureg.m ... hist = climo.hist(bins, data, axis=1) """ if bins.ndim != 1: raise ValueError('Bins must be 1-dimensional.') with _ArrayContext(y, push_right=axis) as context: # Get flattened data y = context.data yhist = np.empty((y.shape[0], bins.size - 1)) # Take histogram for k in range(y.shape[0]): yhist[k, :] = np.histogram(y[k, :], bins=bins)[0] # Replace data context.replace_data(yhist) # Return unflattened data return context.data
[docs]@quack._xarray_fit_wrapper @quack._pint_wrapper(('=x', '=y'), ('=y / x', '=y / x', '=y')) def linefit(x, y, /, axis=0): """ Get linear regression along axis, ignoring NaNs. Uses `~numpy.polyfit`. Parameters ---------- x : array-like The *x* coordinates. y : array-like The *y* coordinates. axis : int, optional Regression axis Returns ------- slope : array-like The slope estimates. The shape is the same as `y` but with dimension `axis` reduced to length 1. stderr : array-like The standard errors of the slope estimates. The shape is the same as `slope`. bestfit : array-like The reconstructed best-fit line. The shape is the same as `y`. """ if x.ndim != 1 or x.size != y.shape[axis]: raise ValueError( f'Invalid x-shape {x.shape} for regression along axis {axis} ' f'of y-shape {y.shape}.' ) with _ArrayContext(y, push_right=axis) as context: # Get regression coefficients. Flattened data is shape (K, N) # where N is regression dimension. Permute to (N, K) then back again. # N gets replaced with length-2 dimension (slope, offset). y = context.data y_params, y_var = np.polyfit(x, y.T, deg=1, cov=True) y_params = np.fliplr(y_params.T) # flip to (offset, slope) # Get best-fit line and slope y_fit = y_params[:, :1] + x * y_params[:, 1:] y_slope = y_params[:, 1:] # Get standard error # See Dave's paper (TODO: add citation) n = y.shape[1] resid = y - y_fit # residual mean = resid.mean(axis=1) var = resid.var(axis=1) rho = np.sum( (resid[:, 1:] - mean[:, None]) * (resid[:, :-1] - mean[:, None]), axis=1, ) / ((n - 1) * var) # correlation factor scale = (n - 2) / (n * ((1 - rho) / (1 + rho)) - 2) # scale factor y_stderr = np.sqrt(y_var[0, 0, :] * scale) # replace offset with stderr # Replace context data context.replace_data(y_slope, y_stderr, y_fit) return context.data
[docs]@quack._xarray_fit_wrapper @quack._pint_wrapper(('=t', ''), ('=t', '=t', '')) def rednoisefit(dt, a, /, nlag=None, nlag_fit=None, axis=0): r""" Return the :math:`e`-folding autocorrelation timescale for the input autocorrelation spectra along an arbitrary axis. Depending on the length of `axis`, the timescale is obtained with either of the following two approaches: 1. Find the :math:`e`-folding timescale(s) for the pure red noise autocorrelation spectra :math:`\exp(-x\Delta t / \tau)` with the least-squares best fit to the provided spectra. 2. Assume the process *is* pure red noise and invert the red noise autocorrelation spectrum at lag 1 to solve for :math:`\tau = \Delta t / \log a_1`. Approach 2 is used if the length of the data along axis `axis` is ``1``, or the data is scalar. In these cases, the data is assumed to represent just the lag-1 autocorrelation(s). Parameters ---------- dt : float, optional The timestep. This is used to scale timescales into physical units. a : array-like The autocorrelation spectra. nlag : int, optional The number of lag timesteps to include in the curve fitting. Default is all the available lags. nlag_fit : int, optional The number of lag timesteps to include in the reconstructed pure red noise autocorrelation spectrum. Default is 50 timesteps. axis : int, optional The "lag" dimension. Each slice along this axis should represent an autocorrelation spectrum generated with `corr`. If the length is ``1``, the data are assumed to be lag-1 autocorrelations and the timescale is computed from the red noise equation. Otherwise, the timescale is estimated from a least-squares curve fit to a red noise spectrum. Returns ------- tau : array-like The autocorrelation timescales. The shape is the same as `data` but with `axis` reduced to length ``1``. sigma : array-like The standard errors. If the timescale was inferred using the lag-1 equation, this is an array of zeros. The shape is the same as `tau`. bestfit : array-like The best fit autocorrelation curves. The shape is the same as `data` but with `axis` of length `nlag`. Example ------- >>> import climopy as climo auto = climo.autocorr(data, axis=0) taus, sigmas = climo.rednoise_fit(auto, axis=0) See Also -------- corr, rednoise, rednoise_spectrum """ # Best-fit function curve = lambda t, tau: np.exp(-t * dt / tau) # noqa: E731 nlag_fit = nlag_fit or 50 lags_fit = np.arange(0, nlag_fit) with _ArrayContext(a, push_right=axis) as context: # Iterate over dimensions a = context.data nextra, nlag_in = a.shape nlag = nlag or nlag_in lags = np.arange(0, nlag) # lags for the curve fit taus = np.empty((nextra, 1)) sigmas = np.empty((nextra, 1)) afit = np.empty((nextra, nlag_fit)) for i in range(nextra): if nlag <= 1: tau = -dt / np.log(a[i, -1]) sigma = 0 # no sigma, because no estimate else: tau, sigma = optimize.curve_fit(curve, lags, a[i, :]) sigma = np.sqrt(np.diag(sigma)) tau, sigma = tau[0], sigma[0] # take only first param taus[i, 0] = tau # just store the timescale sigmas[i, 0] = sigma afit[i, :] = np.exp(-dt * lags_fit / tau) # best-fit spectrum # Replace context data context.replace_data(taus, sigmas, afit) # Return permuted data return context.data
[docs]def wilks(percentiles, alpha=0.10): """ Return the precentile threshold from an array of percentiles that satisfies the given false discovery rate. See :cite:`2006:wilks` for details. Parameters ---------- percentiles : array-like The percentiles. alpha : float, optional The false discovery rate. References ---------- .. bibliography:: ../bibs/fdr.bib """ percentiles = np.asarray(percentiles) pvals = list(percentiles.flat) # want in decimal pvals = sorted(2 * min(pval, 1 - pval) for pval in pvals) ptest = [alpha * i / len(pvals) for i in range(len(pvals))] ppick = max(pv for pv, pt in zip(pvals, ptest) if pv <= pt) / 2 mask = (percentiles <= ppick) | (percentiles >= (1 - ppick)) return percentiles[mask]