#!/usr/bin/env python3
"""
Analyses of variance and trends. Many of these are adapted
from examples and course notes provided by Professors `Elizabeth Barnes \
<http://barnes.atmos.colostate.edu/COURSES/AT655_S15/lecture_slides.html>`__
and `Dennis Hartmann \
<https://atmos.washington.edu/~dennis/552_Notes_ftp.html>`__.
"""
import numpy as np
import scipy.linalg as linalg
import scipy.optimize as optimize
import scipy.stats as stats
from .internals import ic # noqa: F401
from .internals import docstring, quack
__all__ = [
'autocorr',
'autocovar',
'corr',
'covar',
'eof',
'eot',
'reof',
'gaussian',
'hist',
'linefit',
'rednoise',
'rednoisefit',
'wilks',
]
# Docstring snippets
_var_data = """
z : array-like
The input data.
"""
_covar_data = """
z1 : array-like
The input data.
z2 : array-like, optional
The second input data. Must be same shape as `z1`.
"""
_corr_math = r"""
.. math::
\dfrac{%
\sum_{i=0}^{n-k}
\left(x_t - \overline{x}\right)
\left(y_{t+k} - \overline{y}\right)
}{%
(n - k) s_x s_y
}
where :math:`\overline{x}` and :math:`\overline{y}` are the sample means and
:math:`s_x` and :math:`s_y` are the sample standard deviations.
""" # noqa: E501
_covar_math = r"""
.. math::
\dfrac{%
\sum_{i=0}^{n-k}
\left(x_t - \overline{x}\right)
\left(y_{t+k} - \overline{y}\right)
}{%
n - k
}
where :math:`\overline{x}` and :math:`\overline{y}` are the sample means.
""" # noqa: E501
docstring.snippets['template_var'] = """
Return the %(name)s spectrum at a single lag or successive lags. Default
behavior returns the lag-0 %(name)s.
Parameters
----------
dt : float or array-like, optional
The timestep or time series (from which the timestep is inferred).
%(data)s
axis : int, optional
Axis along which %(name)s is taken.
lag : float, optional
Return %(name)s for the single lag `lag` (must be divisible by `dt`).
ilag : int, optional
As with `lag` but specifies the index instead of the physical time.
maxlag : float, optional
Return lagged %(name)s up to the lag `maxlag` (must be divisible by `dt`).
imaxlag : int, optional
As with `maxlag` but specifies the index instead of the physical time.
Returns
-------
lags : array-like
The lags.
result : array-like
The %(name)s as a function of lag.
Notes
-----
This function uses the following formula to estimate %(name)s at lag :math:`k`:
%(math)s
"""
[docs]def gaussian(z, mean=0, stdev=None, sigma=1):
"""
Returns sample points on Gaussian curve.
Parameters
----------
z : array-like, optional
The z-statistics to be sampled.
"""
sigma = stdev if stdev is not None else sigma
norm = stats.norm(loc=mean, scale=sigma)
pdf = norm.pdf(z, loc=mean, scale=sigma)
return z, pdf
[docs]def rednoise(a, ntime=100, nsamples=1, mean=0, stdev=1, state=None):
r"""
Return one or more artificial red noise time series with prescribed mean
and standard deviation. The time series are generated with the following
equation:
.. math::
x(t) = a \cdot x(t - \Delta t) + b \cdot \epsilon(t)
where *a* is the lag-1 autocorrelation and *b* is a scaling term.
Parameters
---------
a : float
The autocorrelation.
ntime : int, optional
Number of timesteps.
nsamples : int or list of int, optional
Axis size or list of axis sizes for the "sample" dimension(s). Shape of the
output array will be ``(ntime,)`` if `nsamples` is not provided,
``(ntime, nsamples)`` if `nsamples` is scalar, or ``(ntime, *nsamples)``
if `nsamples` is a list of axis sizes.
mean, stdev : float, optional
The mean and standard deviation for the red noise time series.
state : `numpy.RandomState`, optional
The random state to use for generating the data.
Returns
-------
data : array-like
The red noise data.
See Also
--------
rednoisefit
"""
# Initial stuff
ntime -= 1 # exclude the initial timestep
nsamples = np.atleast_1d(nsamples)
data = np.empty((ntime + 1, *nsamples)) # user can make N-D array
b = (1 - a ** 2) ** 0.5 # from OA class
# Nested loop
if state is None:
state = np.random
with quack._ArrayContext(data, push_left=0) as context:
data = context.data
data[0, :] = 0.0 # initialize
for i in range(data.shape[-1]):
eps = state.normal(loc=0, scale=1, size=ntime)
for t in range(1, ntime + 1):
data[t, i] = a * data[t - 1, i] + b * eps[t - 1]
# Return
data = context.data
if len(nsamples) == 1 and nsamples[0] == 1:
data = data.squeeze()
return mean + stdev * data # rescale to have specified stdeviation/mean
def _covar_driver(
dt, z1, z2, /, *, lag=None, ilag=None, maxlag=None, imaxlag=None,
axis=0, standardize=False, verbose=False,
):
"""
Driver function for getting covariance.
"""
# Preparation, and stdev/means
dt = quack._as_step(dt)
auto = z1 is z2
if z1.shape != z2.shape:
raise ValueError(f'Incompatible shapes {z1.shape=} and {z2.shape=}.')
naxis = z1.shape[axis] # length
# Parse input args
npassed = sum(_ is not None for _ in (lag, ilag, maxlag, imaxlag))
if npassed == 0:
ilag = 0
elif npassed != 1:
raise ValueError(f'Conflicting kwargs {lag=}, {ilag=}, {maxlag=}, {imaxlag=}.')
if any(_ is not None and not 0 <= _ < naxis - 3 for _ in (ilag, imaxlag)):
raise ValueError(f'Lag index must satisfy 0 <= lag < {naxis - 3}.')
if any(_ is not None and not 0 <= _ < dt * (naxis - 3) for _ in (lag, maxlag)):
raise ValueError(f'Lag time must satisfy 0 <= lag < {dt * (naxis - 3)}.')
if any(_ is not None and not np.isclose(_ % dt, 0) for _ in (lag, maxlag)):
raise ValueError(f'Lag time must be divisible by timestep {dt}.')
if lag is not None:
ilag = np.round(lag / dt).astype(int)
if maxlag is not None:
imaxlag = np.round(maxlag / dt).astype(int)
if verbose:
prefix = 'auto' if auto else ''
suffix = 'correlation' if standardize else 'covariance'
if maxlag is None:
print(f'Calculating lag-{lag} {prefix}{suffix}.')
else:
print(f'Calculating {prefix}{suffix} to lag {maxlag} for axis size {naxis}.') # noqa: E501
# Means and permute
z1 = np.moveaxis(z1, axis, -1)
mean1 = z1.mean(axis=-1, keepdims=True) # keep dims for broadcasting
if auto:
z2, mean2 = z1, mean1
else:
z2 = np.moveaxis(z2, axis, -1)
mean2 = z2.mean(axis=-1, keepdims=True)
# Standardize maybe
std1 = std2 = np.array([1]) # use for covariance
if standardize:
std1 = z1.std(axis=-1, keepdims=True)
if auto:
std2 = std1
else:
std2 = z2.std(axis=-1, keepdims=True)
std1[std1 == 0] = std2[std2 == 0] = 1 # avoid nan error for constant series
# Covariance at zero-lag (included for consistency)
if ilag == 0 or imaxlag == 0:
lags = [0]
covar = np.sum(
(z1 - mean1) * (z2 - mean2),
axis=-1, keepdims=True,
) / (naxis * std1 * std2)
# Covariance on specific lag
elif ilag is not None:
lags = np.array([dt * ilag])
covar = np.sum(
(z1[..., :-ilag] - mean1) * (z2[..., ilag:] - mean2),
axis=-1, keepdims=True,
) / ((naxis - ilag) * std1 * std2)
# Covariance up to n timestep-lags after 0-correlation. Make this
# symmetric if this is not an 'auto' function (i.e. extend to negative lags).
else:
if not auto:
ilags = np.arange(-imaxlag, imaxlag + 1)
else:
ilags = np.arange(0, imaxlag + 1)
lags = dt * ilags
covar = np.empty((*z1.shape[:-1], ilags.size))
for i, ilag in enumerate(ilags):
if ilag == 0:
prod = (z1 - mean1) * (z2 - mean2)
elif ilag < 0: # input 1 *trails* input 2
prod = (z1[..., -ilag:] - mean1) * (z2[..., :ilag] - mean2)
else:
prod = (z1[..., :-ilag] - mean1) * (z2[..., ilag:] - mean2)
covar[..., i] = (
prod.sum(axis=-1, keepdims=True)
/ ((naxis - ilag) * std1 * std2)
)[..., 0]
# Return lags and covariance
return lags, np.moveaxis(covar, -1, axis)
[docs]@quack._xarray_covar_wrapper
@quack._pint_wrapper(('=t', '=z'), ('=t', ''))
@docstring.inject_snippets(name='autocorrelation', data=_var_data, math=_corr_math)
def autocorr(dt, z, axis=0, **kwargs):
"""
%(template_var)s
"""
# NOTE: covar checks if z1 and z2 are the same to to reduce computational cost
kwargs.setdefault('standardize', True)
return _covar_driver(dt, z, z, axis=axis, **kwargs)
[docs]@quack._xarray_covar_wrapper
@quack._pint_wrapper(('=t', '=z'), ('=t', '=z ** 2'))
@docstring.inject_snippets(name='autocovariance', data=_var_data, math=_covar_math)
def autocovar(dt, z, axis=0, **kwargs):
"""
%(template_var)s
"""
# NOTE: covar checks if z1 and z2 are the same to reduce computational cost
return _covar_driver(dt, z, z, axis=axis, **kwargs)
[docs]@quack._xarray_covar_wrapper
@quack._pint_wrapper(('=t', '=z1', '=z2'), ('=t', ''))
@docstring.inject_snippets(name='correlation', data=_covar_data, math=_corr_math)
def corr(dt, z1, z2, axis=0, **kwargs):
"""
%(template_var)s
"""
kwargs.setdefault('standardize', True)
return _covar_driver(dt, z1, z2, axis=axis, **kwargs)
[docs]@quack._xarray_covar_wrapper
@quack._pint_wrapper(('=t', '=z1', '=z2'), ('=t', '=z1 * z2'))
@docstring.inject_snippets(name='covariance', data=_covar_data, math=_covar_math)
def covar(dt, z1, z2, axis=0, **kwargs):
"""
%(template_var)s
"""
return _covar_driver(dt, z1, z2, axis=axis, **kwargs)
[docs]@quack._xarray_eof_wrapper
@quack._pint_wrapper('=z', ('', '=z', '=z ** 2', 'count'))
def eof(
data, /, neof=5, axis_time=-2, axis_space=-1,
weights=None, percent=False, normalize=False,
):
r"""
Calculate the first `N` EOFs using the scipy algorithm for Hermetian matrices
on the covariance matrix.
Parameters
----------
data : array-like
Data of arbitrary shape.
neof : int, optional
Number of eigenvalues we want.
axis_time : int, optional
Axis used as the 'record' or 'time' dimension.
axis_space : int or list of int, optional
Axis or axes used as 'space' dimension.
weights : array-like, optional
Area or mass weights; must be broadcastable on multiplication with
`data` weights. Will be normalized prior to application.
percent : bool, optional
Whether to return raw eigenvalue(s) or the *percentage* of total
variance explained by eigenvalue(s). Default is ``False``.
normalize : bool, optional
Whether to normalize the data by its standard deviation
at every point prior to obtaining the EOFs.
Returns
-------
pcs : array-like
The standardized principal components. The `axis_space` dimensions
are reduced to length 1.
projs : array-like
Projections of the standardized principal components onto the
original data. The `axis_time` dimension is reduced to length 1.
evals
If `percent` is ``Flase``, these are the eigenvalues. Otherwise, this is
the percentage of total variance explained by the corresponding eigenvector.
The `axis_time` and `axis_space` dimensions are reduced to length 1.
nstars
The approximate degrees of freedom as determined by the :cite:`2011:wilks`
autocorrelation critereon. This can be used to compute the approximate 95%
error bounds for the eigenvalues using the :cite:`1982:north` critereon of
:math:`\lambda \sqrt{2 / N^*}`. The `axis_time` and `axis_space` dimensions
are reduced to length 1.
Examples
--------
>>> import numpy as np
>>> import xarray as xr
>>> import climopy as climo
>>> state = np.random.RandomState(51423)
>>> data = xr.DataArray(
... state.rand(10, 6, 100, 40, 20),
... dims=('member', 'run', 'time', 'lev', 'lat'),
... coords={
... 'member': np.arange(1, 11),
... 'run': np.arange(1, 7),
... 'time': np.arange(100.0),
... 'lev': np.linspace(0.0, 1000.0, 40),
... 'lat': np.linspace(-90.0, 90.0, 20),
... }
... )
>>> pcs, projs, evals, nstars = climo.eof(data, axis_time=2, axis_space=(3, 4))
>>> pcs.sizes
Frozen({'eof': 5, 'member': 10, 'run': 6, 'time': 100, 'lev': 1, 'lat': 1})
>>> projs.sizes
Frozen({'eof': 5, 'member': 10, 'run': 6, 'time': 1, 'lev': 40, 'lat': 20})
>>> pcs.head(time=1, run=1, member=1).T
<xarray.DataArray (lat: 1, lev: 1, time: 1, run: 1, member: 1, eof: 5)>
array([[[[[[-0.13679781, 1.08751657, 2.52901891, 0.00737416,
0.55085823]]]]]])
Coordinates:
* member (member) int64 1
* run (run) int64 1
* time (time) float64 0.0
* eof (eof) int64 1 2 3 4 5
Dimensions without coordinates: lat, lev
>>> projs.head(lat=1, lev=1, run=1, member=1).T
<xarray.DataArray (lat: 1, lev: 1, time: 1, run: 1, member: 1, eof: 5)>
array([[[[[[-0.02304145, -0.01572039, 0.02761249, -0.06884522,
0.04163672]]]]]])
Coordinates:
* member (member) int64 1
* run (run) int64 1
* lev (lev) float64 0.0
* lat (lat) float64 -90.0
* eof (eof) int64 1 2 3 4 5
Dimensions without coordinates: time
References
----------
.. bibliography:: ../bibs/eofs.bib
"""
# Parse input
# WARNING: Have to explicitly 'push_left' extra dimensions (even though just
# specifying nflat_left is enough to flattened the desired dimensions) or the new
# 'eof' dimension is not pushed to the left of the extra dimensions. Think this
# is not a bug but consistent with design.
axis_space = np.atleast_1d(axis_space)
np.atleast_1d(axis_time).item() # ensure time axis is 1D
axis_space[axis_space < 0] += data.ndim
if axis_time < 0:
axis_time += data.ndim
if np.any(axis_space == axis_time):
raise ValueError(f'Cannot have {axis_time=} same as {axis_space=}.')
axis_extra = tuple(set(range(data.ndim)) - {axis_time, *axis_space})
# Remove the mean and optionally standardize the data
data = data - data.mean(axis=axis_time, keepdims=True) # remove mean
if normalize:
data = data / data.std(axis=axis_time, keepdims=True)
# Next apply weights
ntime = data.shape[axis_time] # number timesteps
nspace = np.prod(np.asarray(data.shape)[axis_space]) # number space locations
if weights is None:
weights = 1
weights = np.atleast_1d(weights) # want numpy array
weights = weights / weights.mean() # so does not affect amplitude
if ntime > nspace:
dataw = data = data * np.sqrt(weights)
else:
dataw = data * weights # raises error if dimensions incompatible
# Turn matrix in to extra (K) x time (M) x space (N)
# Requires flatening space axes into one, and flattening extra axes into one
shape_orig = data.shape
with quack._ArrayContext(
data, dataw,
push_left=axis_extra,
push_right=(axis_time, *axis_space),
nflat_right=len(axis_space), # flatten space axes
nflat_left=data.ndim - len(axis_space) - 1, # flatten
) as context:
# Retrieve reshaped data
data, dataw = context.data
k = data.shape[0] # ensure 3D
if data.ndim != 3 or ntime != data.shape[1] or nspace != data.shape[2]:
raise RuntimeError(
'Array resizing algorithm failed. '
f'Expected (N x {ntime} x {nspace}). '
f'Turned shape from {shape_orig} to {data.shape}.'
)
# Prepare output arrays
pcs = np.empty((k, neof, ntime, 1))
projs = np.empty((k, neof, 1, nspace))
evals = np.empty((k, neof, 1, 1))
nstars = np.empty((k, 1, 1, 1))
# Get EOFs and PCs and stuff
for i in range(k):
# Get matrices
x = data[i, :, :] # array will be sampling by space
xw = dataw[i, :, :]
# Get reduced degrees of freedom for spatial eigenvalues
# TODO: Fix the weight projection below
rho = np.corrcoef(x.T[:, 1:], x.T[:, :-1])[0, 1] # space x time
rho_ave = (rho * weights).sum() / weights.sum()
nstars[i, 0, 0, 0] = ntime * ((1 - rho_ave) / (1 + rho_ave)) # estimate
# Get EOFs using covariance matrix on *shortest* dimension
# NOTE: Eigenvalues are in *ascending* order, so get the last ones
if x.shape[0] > x.shape[1]:
# Get *temporal* covariance matrix since time dimension larger
# Returns eigenvectors in columns
eigrange = [nspace - neof, nspace - 1] # eigenvalues to get
covar = (xw.T @ xw) / ntime
l, v = linalg.eigh(covar, eigvals=eigrange, eigvals_only=False)
pc = xw @ v # (time x space) x (space x neof) = (time x neof)
else:
# Get *spatial* dispersion matrix since space dimension longer
# This time 'eigenvectors' are actually the pcs
eigrange = [ntime - neof, ntime - 1] # eigenvalues to get
covar = (xw @ x.T) / nspace
l, pc = linalg.eigh(covar, eigvals=eigrange, eigvals_only=False)
# Store in big arrays
# NOTE: We store projection of PC onto data to get 1 standard
# deviation associated with the PC rather than actual eigenvector,
# because eigenvector values may be damped by area weighting.
pc = (pc - pc.mean(axis=0)) / pc.std(axis=0) # standardize pcs
proj = (x.T @ pc) / ntime # (space by time) x (time by neof)
pcs[i, :, :, 0] = pc.T[::-1, :] # neof by time
projs[i, :, 0, :] = proj.T[::-1, :] # neof by space
if percent: # *percent explained* rather than total
evals[i, :, 0, 0] = (100.0 * l[::-1] / np.trace(covar))
else:
evals[i, :, 0, 0] = l[::-1] # actual eigenvalues
# Replace context data with new dimension inserted on left side
context.replace_data(pcs, projs, evals, nstars, insert_left=1)
# Return data restored to original dimensionality
return context.data
[docs]def eot(data, neof=5): # noqa
"""
EOTs, whatever they are.
Warning
-------
Not yet implemented.
"""
raise NotImplementedError
[docs]def reof(data, neof=5): # noqa
"""
Rotated EOFs, e.g. according to the "varimax" method. The EOFs will
be rotated according only to the first `neof` EOFs.
Warning
-------
Not yet implemented.
"""
raise NotImplementedError
[docs]@quack._xarray_hist_wrapper
@quack._pint_wrapper(('=y', '=y'), 'count')
@docstring.inject_snippets(name='count')
def hist(bins, y, /, axis=0):
"""
Get the histogram along axis `axis`.
Parameters
----------
bins : array-like
The bin location.
y : array-like
The data.
%(params_axisdim)s
Examples
--------
>>> import numpy as np
>>> import xarray as xr
>>> import climopy as climo
>>> from climopy import ureg
>>> state = np.random.RandomState(51423)
>>> data = xr.DataArray(
... state.rand(20, 1000) * ureg.m,
... name='distance',
... dims=('x', 'y'),
... coords={'x': np.arange(20), 'y': np.arange(1000) * 0.1}
... )
>>> bins = np.linspace(0, 1, 11) * ureg.m
>>> hist = climo.hist(bins, data, axis=1)
>>> bins
<Quantity([0. 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ], 'meter')>
>>> hist
<xarray.DataArray 'count' (x: 20, distance: 10)>
<Quantity([[100. 102. 101. 112. 99. 98. 93. 97. 84. 114.]
[ 96. 94. 117. 81. 93. 111. 99. 92. 116. 101.]
[116. 92. 101. 93. 100. 100. 101. 106. 95. 96.]
[101. 113. 100. 96. 103. 112. 99. 85. 96. 95.]
[102. 97. 85. 111. 94. 116. 101. 98. 94. 102.]
[ 95. 112. 93. 105. 104. 87. 101. 103. 95. 105.]
[103. 86. 98. 89. 110. 100. 101. 81. 132. 100.]
[ 90. 98. 99. 130. 97. 106. 86. 97. 101. 96.]
[ 95. 110. 96. 92. 88. 87. 118. 101. 112. 101.]
[ 97. 85. 77. 102. 97. 119. 90. 106. 108. 119.]
[ 87. 96. 95. 105. 91. 118. 109. 97. 99. 103.]
[113. 99. 102. 97. 91. 97. 89. 110. 104. 98.]
[100. 107. 110. 97. 85. 114. 104. 95. 97. 91.]
[110. 102. 87. 98. 84. 99. 119. 92. 109. 100.]
[ 95. 96. 101. 118. 103. 93. 89. 102. 90. 113.]
[ 94. 87. 119. 102. 106. 100. 110. 108. 83. 91.]
[ 98. 85. 96. 101. 101. 122. 85. 95. 111. 106.]
[ 93. 111. 87. 95. 93. 103. 107. 111. 92. 108.]
[ 86. 95. 89. 109. 90. 98. 119. 90. 116. 108.]
[103. 100. 106. 87. 102. 88. 103. 121. 93. 97.]], 'count')>
Coordinates:
* x (x) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
* distance (distance) float64 0.05 0.15 0.25 0.35 ... 0.65 0.75 0.85 0.95
"""
if bins.ndim != 1:
raise ValueError('Bins must be 1-dimensional.')
with quack._ArrayContext(y, push_right=axis) as context:
# Get flattened data
y = context.data
yhist = np.empty((y.shape[0], bins.size - 1))
# Take histogram
for k in range(y.shape[0]):
yhist[k, :] = np.histogram(y[k, :], bins=bins)[0]
# Replace data
context.replace_data(yhist)
# Return unflattened data
return context.data
[docs]@quack._xarray_fit_wrapper
@quack._pint_wrapper(('=x', '=y'), ('=y / x', '=y / x', '=y'))
def linefit(x, y, /, axis=0):
"""
Get linear regression along axis, ignoring NaNs. Uses `~numpy.polyfit`.
Parameters
----------
x : array-like
The *x* coordinates.
y : array-like
The *y* coordinates.
axis : int, optional
Regression axis
Returns
-------
slope : array-like
The slope estimates. The shape is the same as `y` but with
dimension `axis` reduced to length 1.
stderr : array-like
The standard errors of the slope estimates. The shape is the
same as `slope`.
fit : array-like
The reconstructed best-fit line. The shape is the same as `y`.
Examples
--------
>>> import numpy as np
>>> import climopy as climo
>>> state = np.random.RandomState(51423)
>>> x = np.arange(500)
>>> y = state.rand(10, 500) + 0.1 * x
>>> slope, stderr, fit = climo.linefit(x, y, axis=1)
>>> slope
array([[0.10000399],
[0.09997467],
[0.09980544],
[0.10004589],
[0.10002195],
[0.09996018],
[0.10009204],
[0.09992162],
[0.10014288],
[0.10011434]])
"""
if x.ndim != 1 or x.size != y.shape[axis]:
raise ValueError(
f'Invalid x-shape {x.shape} for regression along axis {axis} '
f'of y-shape {y.shape}.'
)
with quack._ArrayContext(y, push_right=axis) as context:
# Get regression coefficients. Flattened data is shape (K, N)
# where N is regression dimension. Permute to (N, K) then back again.
# N gets replaced with length-2 dimension (slope, offset).
y = context.data
params, covar = np.polyfit(x, y.T, deg=1, cov=True)
params = np.fliplr(params.T) # flip to (offset column 0, slope column 1)
# Get best-fit line and slope
fit = params[:, :1] + x * params[:, 1:]
slope = params[:, 1:]
# Get standard error
# See Dave's paper (TODO: add citation)
n = y.shape[1]
resid = y - fit # residual
mean = resid.mean(axis=1, keepdims=True)
var = resid.var(axis=1, keepdims=True)
rho = np.sum(
(resid[:, 1:] - mean) * (resid[:, :-1] - mean), axis=1, keepdims=True,
) / ((n - 1) * var) # correlation factor
scale = (n - 2) / (n * ((1 - rho) / (1 + rho)) - 2) # scale factor
stderr = np.sqrt(scale * covar[0, 0, :, None])
# Replace context data
context.replace_data(slope, stderr, fit)
return context.data
[docs]@quack._xarray_fit_wrapper
@quack._pint_wrapper(('=t', ''), ('=t', '=t', ''))
def rednoisefit(
dt, a, /, maxlag=None, imaxlag=None, maxlag_fit=None, imaxlag_fit=None, axis=0
):
r"""
Return the :math:`e`-folding autocorrelation timescale for the input
autocorrelation spectra along an arbitrary axis. Depending on the length
of `axis`, the timescale is obtained with either of the following two
approaches:
1. Find the :math:`e`-folding timescale(s) for the pure red noise
autocorrelation spectra :math:`\exp(-x\Delta t / \tau)` with the
least-squares best fit to the provided spectra.
2. Assume the process *is* pure red noise and invert the
red noise autocorrelation spectrum at lag 1 to solve for
:math:`\tau = \Delta t / \log a_1`.
Approach 2 is used if `axis` is singleton or the data is scalar. In these
cases, the data is assumed to represent just the lag-1 autocorrelation.
Parameters
----------
dt : float or array-like, optional
The timestep or time series (from which the timestep is inferred).
a : array-like
The autocorrelation spectra.
maxlag : float, optional
The maximum time lag to include in the red noise fit.
imaxlag : int, optional
As with `maxlag` but specifies the index instead of the physical time.
maxlag_fit : float, optional
The maximum time lag to include in the output pure red noise spectrum.
imaxlag_fit : int, optional
As with `maxlag` but specifies the index instead of the physical time.
axis : int, optional
The "lag" dimension. Each slice along this axis should represent an
autocorrelation spectrum generated with `corr`.
* If `axis` is singleton, the data are assumed to be lag-1 autocorrelations
and the timescale is computed from the lag-1 pure red noise formula.
* If `axis` is non-singleton, the timescale is estimated from a least-squares
curve fit to a pure red noise spectrum up to `maxlag`.
Returns
-------
tau : array-like
The autocorrelation timescales. The shape is the same as `data`
but with `axis` reduced to length ``1``.
sigma : array-like
The standard errors. If the timescale was inferred using the lag-1
equation, this is an array of zeros. The shape is the same as `tau`.
fit : array-like
The best fit autocorrelation spectrum. The shape is the same as `data`
but with `axis` of length `nlag`.
Examples
--------
>>> import numpy as np
>>> import climopy as climo
>>> state = np.random.RandomState(51423)
>>> data = climo.rednoise(0.8, 500, 10, state=state)
>>> lag, auto = climo.autocorr(1, data, axis=0, maxlag=50)
>>> taus, sigmas, fit = climo.rednoisefit(lag, auto, axis=0)
>>> taus
array([[5.97691453, 4.29275329, 4.91997185, 4.87781027, 3.46404331,
4.23444888, 4.91852921, 4.39283164, 4.79466674, 3.81250855]])
See Also
--------
corr, rednoise
"""
# Initial stuff
dt = quack._as_step(dt)
curve_func = lambda t, tau: np.exp(-t * dt / tau) # noqa: E731
# Parse arguments
if maxlag is not None and imaxlag is not None:
raise ValueError(f'Conflicting kwargs {maxlag=} and {imaxlag=}.')
if maxlag_fit is not None and imaxlag_fit is not None:
raise ValueError(f'Conflicting kwargs {maxlag_fit=} and {imaxlag_fit=}.')
if any(_ is not None and not np.isclose(_ % dt, 0) for _ in (maxlag, maxlag_fit)):
raise ValueError(f'Lag time must be divisible by timestep {dt}.')
if maxlag is not None:
imaxlag = np.round(maxlag / dt).astype(int)
if maxlag_fit is not None:
imaxlag_fit = np.round(maxlag_fit / dt).astype(int)
with quack._ArrayContext(a, push_right=axis) as context:
# Set defaults
a = context.data
nlag = a.shape[1] - 1 # not including 0-lag entry
nextra = a.shape[0]
imaxlag = max(1, nlag) if imaxlag is None else min(imaxlag, nlag)
imaxlag_fit = imaxlag if imaxlag_fit is None else imaxlag_fit # can be anything
lags = np.arange(0, imaxlag + 1) # lags for the curve fit
lags_fit = np.arange(0, imaxlag_fit + 1)
# Iterate over dimensions
taus = np.empty((nextra, 1))
sigmas = np.empty((nextra, 1))
fit = np.empty((nextra, imaxlag_fit + 1))
for i in range(nextra):
if imaxlag <= 1:
# Scalar function
tau = -dt / np.log(a[i, -1])
sigma = 0
else:
# Curve fit
tau, sigma = optimize.curve_fit(curve_func, lags, a[i, :])
sigma = np.sqrt(np.diag(sigma))
tau, sigma = tau[0], sigma[0]
# Timescales, uncertainty and the curve itself
taus[i, 0] = tau # just store the timescale
sigmas[i, 0] = sigma
fit[i, :] = np.exp(-dt * lags_fit / tau) # best-fit spectrum
# Replace context data
context.replace_data(taus, sigmas, fit)
# Return permuted data
return context.data
[docs]def wilks(percentiles, alpha=0.10):
"""
Return the precentile threshold from an array of percentiles that satisfies
the given false discovery rate. See :cite:`2006:wilks` for details.
Parameters
----------
percentiles : array-like
The percentiles.
alpha : float, optional
The false discovery rate.
References
----------
.. bibliography:: ../bibs/fdr.bib
"""
percentiles = np.asarray(percentiles)
pvals = list(percentiles.flat) # want in decimal
pvals = sorted(2 * min(pval, 1 - pval) for pval in pvals)
ptest = [alpha * i / len(pvals) for i in range(len(pvals))]
ppick = max(pv for pv, pt in zip(pvals, ptest) if pv <= pt) / 2
mask = (percentiles <= ppick) | (percentiles >= (1 - ppick))
return percentiles[mask]