from math import floor
import numpy as np
import scipy.stats as st
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt
import warnings
[docs]def savgol(x, window_size=100, polyorder=1):
"""
Perform Savitzky-Golay filtration of 1-D array.
Args:
x (np.ndarray) - ordered samples
window_size (int) - filter size
polyorder (int) - polynomial order
Returns:
trend (np.ndarray) - smoothed values
"""
# window size must be odd
if window_size % 2 == 0:
window_size += 1
window_size = int(window_size)
with warnings.catch_warnings():
# filter multidimensional indexing warning (fixed in scipy v1.2.0)
warnings.filterwarnings('ignore', category=FutureWarning)
# filter scipy gelsd warning
warnings.filterwarnings('ignore', message='^internal gelsd')
# apply savgol filter
trend = savgol_filter(x, window_size, polyorder=polyorder)
return trend
[docs]def get_running_mean(x, window_size=100):
"""
Returns running mean for a 1D vector. This is the fastest implementation, but is limited to one-dimensional arrays and doesn't permit interval specification.
Args:
x (np.ndarray) - ordered samples, length N
window_size (int) - size of window, W
Returns:
means (np.ndarray) - moving average of x
"""
cumsum = np.cumsum(np.insert(x, 0, 0))
return (cumsum[window_size:] - cumsum[:-window_size]) / window_size
[docs]def get_rolling_window(x, window_size=100, resolution=1):
"""
Return array slices within a rolling window.
Args:
x (np.ndarray) - ordered samples, length N
window_size (int) - size of window, W
resolution (int) - sampling interval
Returns:
windows (np.ndarray) - sampled values, N/resolution x W
"""
if resolution is None:
resolution = window_size
shape = x.shape[:-1] + (floor((x.shape[-1] - window_size + 1)/resolution), window_size)
strides = x.strides[:-1] + (x.strides[-1]*resolution,) + (x.strides[-1],)
return np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
[docs]def get_rolling_mean(x, **kw):
"""
Compute rolling mean. This implementation permits flexible sampling intervals and multi-dimensional time series, but is slower than get_running_mean for 1D time series.
Args:
x (np.ndarray) - ordered samples, length N
kw: arguments for window specification
Returns:
means (np.ndarray) - moving average of x, N/resolution x 1
"""
return get_rolling_window(x, **kw).mean(axis=-1)
[docs]def get_binned_mean(x, window_size=100):
"""
Returns mean values within non-overlapping sequential windows.
Args:
x (np.ndarray) - ordered samples, length N
window_size (int) - size of window, W
Returns:
means (np.ndarray) - bin means, N/W x 1
"""
return get_rolling_mean(x, window_size=window_size, resolution=window_size)
[docs]def apply_custom_roller(func, x, **kwargs):
"""
Apply function to rolling window.
Args:
func (function) - function applied to each window, returns 1 x N_out
x (np.ndarray) - ordered samples, length N
kwargs: keyword arguments for window specification
Returns:
fx (np.ndarray) - function output for each window, N/resolution x N_out
"""
windows = get_rolling_window(x, **kwargs)
return np.apply_along_axis(func, axis=-1, arr=windows)
[docs]def subsample(x, frac=1):
"""
Subsample array with replacement.
Args:
x (np.ndarray) - ordered samples, length N
frac (float) - sample size (fraction of array)
Returns:
sample (np.ndarray) - subsampled values
"""
return x[np.random.randint(0, len(x), size=floor(frac*len(x)))]
[docs]def bootstrap(x, func=np.mean, confidence=95, N=1000):
"""
Returns point estimate obtained by parametric bootstrap.
Args:
x (np.ndarray) - ordered samples, length N
func (function) - function applied to each bootstrap sample
confidence (float) - confidence interval, between 0 and 100
N (int) - number of bootstrap samples
Returns:
interval (np.ndarray) - confidence interval bounds
"""
point_estimates = [func(subsample(x)) for _ in range(N)]
q = (((100-confidence)/2), (100+confidence)/2)
return np.percentile(point_estimates, q=q)
[docs]def get_rolling_mean_interval(x,
window_size=100,
resolution=1,
confidence=95,
nbootstraps=1000):
"""
Evaluate confidence interval for moving average of ordered values.
Args:
x (np.ndarray) - ordered samples, length N
window_size (int) - size of window, W
resolution (int) - sampling interval
confidence (float) - confidence interval, between 0 and 100
nbootstraps (int) - number of bootstrap samples
Returns:
interval (np.ndarray) - confidence interval bounds, N/resolution x 2
"""
# define bootstrapping operation
f = lambda s: bootstrap(s, np.mean, confidence=confidence, N=nbootstraps)
# apply to moving windows
kw = dict(window_size=window_size, resolution=resolution)
interval = apply_custom_roller(f, x, **kw)
return interval
[docs]def get_rolling_gaussian(x, window_size=100, resolution=10):
"""
Returns gaussian fit within sliding window.
Args:
x (np.ndarray) - ordered samples
window_size (int) - size of window
resolution (int) - sampling interval
Returns:
model (scipy.stats.norm)
"""
# assemble sliding windows
kw = dict(window_size=window_size, resolution=resolution)
windows = get_rolling_window(x, **kw)
# fit gaussians
loc = windows.mean(axis=-1)
scale = np.sqrt(windows.var(axis=-1))
model = st.norm(loc=loc, scale=scale)
return model
[docs]def detrend_signal(x, window_size=99, order=1):
"""
Detrend and scale fluctuations using first-order univariate spline.
Args:
x (np array) -ordered samples
window_size (int) - size of interpolation window for lowpass filter
order (int) - spline order
Returns:
residuals (np array) - detrended residuals
trend (np array) - spline fit to signal
"""
# use odd window size
if window_size % 2 == 0:
window_size += 1
window_size = int(window_size)
# evaluate trend
trend = savgol(x, window_size=window_size, polyorder=order)
# evaluate residuals (catch warnings for invalid values)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
residuals = x - trend
return residuals, trend
[docs]def smooth(x, window):
""" Returns smoothed moving average of <x> within <window>. """
kw = dict(window_size=window, resolution=int(window/5))
return savgol(get_rolling_mean(x, **kw), window)
[docs]def plot_mean(ax, x, y,
label=None,
ma_type='sliding',
window_size=100,
resolution=1,
line_color='k',
line_width=1,
line_alpha=1,
linestyle=None,
markersize=2,
smooth=False,
**kw):
"""
Plot moving average.
Args:
x, y (array like) - timeseries data
ax (matplotlib.axes.AxesSubplot) - axis which to which line is added
label (str) - data label
ma_type (str) - type of average used, either sliding, binned, or savgol
window_size (int) - size of window
resolution (int) - sampling interval
line_color, line_width, line_alpha, linestyle - formatting parameters
smooth (bool) - if True, apply secondary savgol filter
Returns:
line (matplotlib.lines.Line2D)
"""
# get moving average (skip first point to avoid outliers)
if ma_type == 'savgol':
x_av = x[1:]
y_av = savgol(y, window_size=window_size, polyorder=1)[1:]
else:
if ma_type == 'binned':
resolution = window_size
x_av = get_rolling_mean(x, window_size=window_size, resolution=resolution)
y_av = get_rolling_mean(y, window_size=window_size, resolution=resolution)
# get line and dashstyles
if linestyle == None:
linestyle = 'solid'
dashstyles = {'solid': (None, None), 'dashed': (2.0, 2.0)}
dashstyle = dashstyles[linestyle]
# apply secondary smoothing (for visualization)
if smooth:
sw = int(window_size/3)
if sw > 1:
y_av = savgol(y_av, window_size=sw, polyorder=1)
# plot line
line = ax.plot(x_av, y_av,
linestyle=linestyle, dashes=dashstyle,
lw=line_width, color=line_color, alpha=line_alpha,
label=label, markersize=markersize)
return line
[docs]def plot_mean_interval(ax, x, y,
ma_type='sliding',
window_size=100,
resolution=10,
nbootstraps=1000,
confidence=95,
color='grey',
alpha=0.25,
error_bars=False,
lw=0.):
"""
Adds confidence interval for line average (sliding window or binned) to existing axes.
Args:
x, y (array like) - data
ax (axes) - axis which to which line is added
ma_type (str) - type of average used, either 'sliding' or 'binned'
window_size (int) - size of sliding window or bin (num of cells)
interval_resolution (int) - sampling resolution for confidence interval
nbootstraps (int) - number of bootstraps
confidence (float) - confidence interval, between 0 and 100
color, alpha - formatting parameters
"""
if ma_type == 'binned':
resolution = window_size
x_av = get_rolling_mean(x, window_size=window_size, resolution=resolution)
y_av = get_rolling_mean(y, window_size=window_size, resolution=resolution)
interval = get_rolling_mean_interval(y, window_size=window_size, resolution=resolution, nbootstraps=nbootstraps, confidence=confidence)
y_lower, y_upper = interval.T
_ = ax.fill_between(x_av, y_lower, y_upper,
color=color, alpha=alpha, lw=lw)
if error_bars == True:
ax.errorbar(x_av, y_av, yerr=[y_av-y_lower, y_upper-y_av], fmt='-o', color=color)