from math import floor
import numpy as np
from pandas import Series
import scipy.stats as st
from scipy.signal import savgol_filter
import warnings
[docs]def savgol(x, window_size=100, polyorder=1):
    """
    Perform Savitzky-Golay filtration of 1-D array.
    Args:
        x (np.ndarray) - ordered samples
        window_size (int) - filter size
        polyorder (int) - polynomial order
    Returns:
        trend (np.ndarray) - smoothed values
    """
    # window size must be odd
    if window_size % 2 == 0:
        window_size += 1
    window_size = int(window_size)
    with warnings.catch_warnings():
        # filter multidimensional indexing warning (fixed in scipy v1.2.0)
        warnings.filterwarnings('ignore', category=FutureWarning)
        # filter scipy gelsd warning
        warnings.filterwarnings('ignore', message='^internal gelsd')
        # apply savgol filter
        trend = savgol_filter(x, window_size, polyorder=polyorder)
    return trend 
[docs]def get_running_mean(x, window_size=100):
    """
    Returns running mean for a 1D vector. This is the fastest implementation, but is limited to one-dimensional arrays and doesn't permit interval specification.
    Args:
        x (np.ndarray) - ordered samples, length N
        window_size (int) - size of window, W
    Returns:
        means (np.ndarray) - moving average of x
    """
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[window_size:] - cumsum[:-window_size]) / window_size 
[docs]def get_rolling_window(x, window_size=100, resolution=1):
    """
    Return array slices within a rolling window.
    Args:
        x (np.ndarray) - ordered samples, length N
        window_size (int) - size of window, W
        resolution (int) - sampling interval
    Returns:
        windows (np.ndarray) - sampled values, N/resolution x W
    """
    # use values from pandas series
    if isinstance(x, Series):
        x = x.values
    shape = x.shape[:-1] + (floor((x.shape[-1] - window_size + 1)/resolution), window_size)
    strides = x.strides[:-1] + (x.strides[-1]*resolution,) + (x.strides[-1],)
    return np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides) 
[docs]def get_rolling_mean(x, **kw):
    """
    Compute rolling mean. This implementation permits flexible sampling intervals and multi-dimensional time series, but is slower than get_running_mean for 1D time series.
    Args:
        x (np.ndarray) - ordered samples, length N
        kw: arguments for window specification
    Returns:
        means (np.ndarray) - moving average of x, N/resolution x 1
    """
    return get_rolling_window(x, **kw).mean(axis=-1) 
[docs]def get_binned_mean(x, window_size=100):
    """
    Returns mean values within non-overlapping sequential windows.
    Args:
        x (np.ndarray) - ordered samples, length N
        window_size (int) - size of window, W
    Returns:
        means (np.ndarray) - bin means, N/W x 1
    """
    return get_rolling_mean(x, window_size=window_size, resolution=window_size) 
[docs]def apply_custom_roller(func, x, **kwargs):
    """
    Apply function to rolling window.
    Args:
        func (function) - function applied to each window, returns 1 x N_out
        x (np.ndarray) - ordered samples, length N
        kwargs: keyword arguments for window specification
    Returns:
        fx (np.ndarray) - function output for each window, N/resolution x N_out
    """
    windows = get_rolling_window(x, **kwargs)
    return np.apply_along_axis(func, axis=-1, arr=windows) 
[docs]def subsample(x, frac=1):
    """
    Subsample array with replacement.
    Args:
        x (np.ndarray) - ordered samples, length N
        frac (float) - sample size (fraction of array)
    Returns:
        sample (np.ndarray) - subsampled values
    """
    return x[np.random.randint(0, len(x), size=floor(frac*len(x)))] 
[docs]def bootstrap(x, func=np.mean, confidence=95, N=1000):
    """
    Returns point estimate obtained by parametric bootstrap.
    Args:
        x (np.ndarray) - ordered samples, length N
        func (function) - function applied to each bootstrap sample
        confidence (float) - confidence interval, between 0 and 100
        N (int) - number of bootstrap samples
    Returns:
        interval (np.ndarray) - confidence interval bounds
    """
    point_estimates = [func(subsample(x)) for _ in range(N)]
    q = (((100-confidence)/2), (100+confidence)/2)
    return np.percentile(point_estimates, q=q) 
[docs]def get_rolling_mean_interval(x,
                              window_size=100,
                              resolution=1,
                              confidence=95,
                              nbootstraps=1000):
    """
    Evaluate confidence interval for moving average of ordered values.
    Args:
        x (np.ndarray) - ordered samples, length N
        window_size (int) - size of window, W
        resolution (int) - sampling interval
        confidence (float) - confidence interval, between 0 and 100
        nbootstraps (int) - number of bootstrap samples
    Returns:
        interval (np.ndarray) - confidence interval bounds, N/resolution x 2
    """
    # define bootstrapping operation
    f = lambda s: bootstrap(s, np.mean, confidence=confidence, N=nbootstraps)
    # apply to moving windows
    kw = dict(window_size=window_size, resolution=resolution)
    interval = apply_custom_roller(f, x, **kw)
    return interval 
[docs]def get_rolling_gaussian(x, window_size=100, resolution=10):
    """
    Returns gaussian fit within sliding window.
    Args:
        x (np.ndarray) - ordered samples
        window_size (int) - size of window
        resolution (int) - sampling interval
    Returns:
        model (scipy.stats.norm)
    """
    # assemble sliding windows
    kw = dict(window_size=window_size, resolution=resolution)
    windows = get_rolling_window(x, **kw)
    # fit gaussians
    loc = windows.mean(axis=-1)
    scale = np.sqrt(windows.var(axis=-1))
    model = st.norm(loc=loc, scale=scale)
    return model 
[docs]def detrend_signal(x, window_size=99, order=1):
    """
    Detrend and scale fluctuations using first-order univariate spline.
    Args:
        x (np array) -ordered samples
        window_size (int) - size of interpolation window for lowpass filter
        order (int) - spline order
    Returns:
        residuals (np array) - detrended residuals
        trend (np array) - spline fit to signal
    """
    # use odd window size
    if window_size % 2 == 0:
        window_size += 1
    window_size = int(window_size)
    # evaluate trend
    trend = savgol(x, window_size=window_size, polyorder=order)
    # evaluate residuals (catch warnings for invalid values)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        residuals = x - trend
    return residuals, trend