Source code for flyeye.processing.alignment

import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt
from functools import reduce
from operator import add
from math import ceil, floor
from scipy.interpolate import interp1d

from ..dynamics.averages import get_running_mean, get_rolling_mean


[docs]class Alignment: """ Object for alignment of a 1-D timeseries with another. Attributes: t, x (np.ndarray) - timeseries to be aligned t_ref, x_ref (np.ndarray) - reference to which timeseries is aligned window_size (int) - window size used for local smoothing lag (float) - computed time shift for optimal alignment score (float) - computed metric for quality of alignment """ def __init__(self, t, x, t_ref=None, x_ref=None, metric='crosscorrelation', window_size=10): """ Instantiate object for alignment of one timeseries with another. Args: t (np.ndarray) - timepoints of timeseries to be aligned x (np.ndarray) - values of timeseries to be aligned t_ref (np.ndarray) - timepoints of reference x_ref (np.ndarray) - values of reference metric (str) - name of alignment criterion window_size (int) - window size used for local smoothing """ self.t, self.x = self._initialize_position(t, x) self.window_size = window_size if t_ref is None and x_ref is None: t_ref, x_ref = t, x self.t_ref, self.x_ref = self._initialize_position(t_ref, x_ref) lag, score = self.align(metric=metric, window_size=window_size) self.lag = lag self.score = score def _get_alignnment_object(self): """ Return new Alignment instance. """ return Alignment(self.t, self.x, self.t_ref, self.x_ref) @staticmethod def _sort(t, x): """ Sort timeseries by time. Args: t (np.ndarray) - timepoints x (np.ndarray) - values Returns: ts, xs (np.ndarray) - time-sorted timepoints and values """ sort_ind = np.argsort(t) return t[sort_ind], x[sort_ind] @staticmethod def _align_zero(x): """ Shift values so minimum is zero. """ x -= x.min() return x @classmethod def _initialize_position(cls, t_, x_): """ Sort vector and shift to zero start position. """ t, x = cls._sort(t_, x_) return cls._align_zero(t), x @staticmethod def _get_regular_timepoints(t, dt=0.1): """ Construct array of regularly sampled timepoints. """ return np.arange(t.min(), t.max(), step=dt) @staticmethod def _interpolate(t, x, t_regular): """ Interpolate values onto regularly sampled timepoints. """ interp = interp1d(t, x) return interp(t_regular) @classmethod def _smooth(cls, t, x, window_size=10, dt=0.1): """ Return moving average interpolated onto regular timepoints. """ # evaluate moving average ts = get_rolling_mean(t, window_size=window_size) xs = get_rolling_mean(x, window_size=window_size) # interpolate onto regulat timepoints tsr = cls._get_regular_timepoints(ts, dt=dt) xsr = cls._interpolate(ts, xs, tsr) return tsr, xsr @staticmethod def _zscore(x): """ Z-score values. """ return (x - x.mean())/x.std() @staticmethod def _unit_scale(x): """ Re-scale by maximum value. """ return x/x.max() @classmethod def _get_crosscorrelation(cls, x, y): """ Evaluate normalized crosscorrelation between z-scored vectors. """ xz, yz = cls._zscore(x), cls._zscore(y) return np.correlate(xz, yz, mode='full') / min(len(x), len(y)) @classmethod def _get_crossvariogram(cls, x, y): """ Evaluate nonnormalized correlation between unit-scaled vectors. """ xu, yu = cls._unit_scale(x), cls._unit_scale(y) return np.correlate(xu, yu, mode='full') / min(len(x), len(y)) @staticmethod def _slide_with_extrapolation(f, x, y): """ Apply function over slices in which y rolls across x. Outer-bounds of y are filled with its min/max values. """ n, m = len(x), len(y) x_ref = np.hstack((np.mean(y[0:5])*np.ones(n), y, np.mean(y[-5:])*np.ones(n))) return np.array([f(x, x_ref[-(i+1+n):-(i+1)]) for i in range(n+m-1)]) @staticmethod def _slide_full(func, x, y): """ Apply function over slices in which y rolls over x. Behavior is quivalent to 'full' mode in np.correlate. """ n, m = len(x), len(y) evals = [] for i in range(n+m-1): if i < min(n, m): func_eval = func(y[-(i+1):], x[:(i+1)]) evals.append(func_eval) elif m >= n: func_eval = func(y[(-i-1):(n-i)-1], x[(i - m-n+1):]) evals.append(func_eval) else: func_eval = func(x_ref[:-(i - n - m+1)], x[-(m-i)+1:-(-i-1)]) evals.append(func_eval) return np.array(evals) @classmethod def _get_inverse_distance(cls, x, y): """ Evaluate inverse distance between unit-scaled vectors. """ get_inverse_distance = lambda x1, x2: 1/((abs(x1-x2)).sum()) scores = cls._slide_with_extrapolation(get_inverse_distance, cls._unit_scale(x), cls._unit_scale(y)) return scores @classmethod def _get_inverse_squared_distance(cls, x, y): """ Evaluate squared inverse distance between unit-scaled vectors. """ get_inverse_squared_distance = lambda x1, x2: 1/(((x1-x2)**2).sum()) scores = cls._slide_with_extrapolation(get_inverse_squared_distance, cls._unit_scale(x), cls._unit_scale(y)) return scores @staticmethod def _get_lag_vector(t, t_ref, **kwargs): """ Returns ordered vector of lag times. Lags are shifted by minimums in order to account for first timepoint being shifted above zero by smoothing. """ return np.hstack((t_ref[::-1][:-1]-t.min(), t_ref.min()-t.min(), -t[1:]+ t_ref.min())) @staticmethod def _maximize(lags, scores): """ Returns lag that maximizes scoring metric, along with corresponding score. """ index = scores.argmax() return lags[index], scores[index]
[docs] def get_smoothed_scores(self, metric='crosscorrelation', window_size=10, dt=0.1): """ Returns smoothed alignment scores. """ # interpolate rolling averages onto regular grid t_ref, x_ref = self._smooth(self.t_ref, self.x_ref, window_size=window_size, dt=dt) t, x = self._smooth(self.t, self.x, window_size=window_size, dt=dt) # evaluate scores if metric == 'crosscorrelation': scores = self._get_crosscorrelation(x, x_ref) elif metric == 'crossvariogram' or metric == 'variogram': scores = self._get_crossvariogram(x, x_ref) elif metric == 'inverse_distance': scores = self._get_inverse_distance(x, x_ref) elif metric == 'inverse_squared_distance': scores = self._get_inverse_squared_distance(x, x_ref) else: raise ValueError('Alignment metric not recognized.') return t, t_ref, scores
[docs] def align(self, metric='crosscorrelation', window_size=50, dt=0.1): """ Run alignment to determine optimal lag. Args: metric (str) - name of alignment criterion window_size (int) - window size for local smoothing dt (float) - resolution Returns: lag (float) - time shift that maximizes quality of alignment score (float) - quality of alignment """ # get smoothed metric vectors t, t_ref, scores = self.get_smoothed_scores(metric=metric, window_size=window_size, dt=dt) # find lag that maximizes metric lags = self._get_lag_vector(t, t_ref, window_size=window_size) lag, score = self._maximize(lags, scores) return lag, score
[docs] def plot_scores(self, metric='crosscorrelation', ax=None, window_size=None, dt=0.1): """ Plot quality of alignment versus lag time. Args: metric (str) - name of alignment criterion ax (matplotlib.axes.AxesSubplot) window_size (int) - window size for local smoothing dt (float) - resolution """ # create axes if none provided if ax is None: fig, ax = plt.subplots(figsize=(4, 2)) if window_size is None: window_size = self.window_size # get correlations t, t_ref, scores = self.get_smoothed_scores(metric=metric, window_size=window_size, dt=dt) lags = self._get_lag_vector(t, t_ref, window_size=window_size) # plot correlation as a function of lag ax.plot(lags, scores, '-k', linewidth=3, markersize=0) # add maximum ax.scatter(*self._maximize(lags, scores), s=25, color='red', zorder=99) # format plot ax.set_xlabel('Lag', fontsize=12) ax.set_ylabel(metric.title(), fontsize=12) if metric=='crosscorrelation' or metric=='crossvariogram': ax.set_ylim(-0.7, 1.2) ax.set_yticks([-0.5, 0, 0.5, 1]) ax.axhline(1, linestyle='--', color='k') ax.axhline(0, linestyle='--', color='k')
[docs] def plot_alignment(self, scatter=True, trend=True, ax=None, window_size=100): """ Plot aligned time series. Args: scatter (bool) - if True, show individual samples trend (bool) - if True, show moving average ax (matplotlib.axes.AxesSubplot) window_size (int) - window size for local smoothing """ # add datapoints to plot if ax is None: fig, ax = plt.subplots(figsize=(4, 2)) if scatter: ax.plot(self.t + self.lag, self.x, '.r', alpha=0.1) ax.plot(self.t_ref, self.x_ref, '.k', alpha=0.1) # add moving averages to plot if trend: smooth = lambda x: get_running_mean(x, window_size=window_size) red_line, = ax.plot(smooth(self.t + self.lag), smooth(self.x), '-r', alpha=1) black_line, = ax.plot(smooth(self.t_ref), smooth(self.x_ref), '-k', alpha=1) # format plot ax.legend([black_line, red_line], ['reference', 'aligned'], loc=0, frameon=False)
[docs]class CellsAlignment(Alignment): """ Object for alignment of one group of cells with another. Attributes: data (pd.DataFrame) - aligned cells data Inherited attributes: t, x (np.ndarray) - timeseries to be aligned t_ref, x_ref (np.ndarray) - reference to which timeseries is aligned window_size (int) - window size used for local smoothing lag (float) - computed time shift for optimal alignment score (float) - computed metric for quality of alignment """ def __init__(self, data, reference_data=None, channel='ch1_normalized', basis='t', metric='crosscorrelation', window_size=None): """ Instantiate object for alignment of one group of cells with another. Args: data (pd.DataFrame) - aligned cells data reference_data (pd.DataFrame) - reference cells data channel (str or int) - cells attribute used as alignment values basis (str) - cells attribute used as alignment timepoints metric (str) - name of alignment criterion window_size (int) - window size used for local smoothing (# of cells) """ # extract timeseries self.data = data t, x = data[basis].values, data[channel].values t_ref, x_ref = reference_data[basis].values, reference_data[channel].values # set window size if window_size is None: window_size = 10 # perform alignment Alignment.__init__(self, t, x, t_ref, x_ref, metric=metric, window_size=window_size)
[docs]class DiscAlignment(CellsAlignment): """ Object for alignment of one Disc instance with another. Attributes: disc (data.discs.Disc) - copy of aligned disc instance hours_per_pixel (float) - distance to time scaling factor space_lag (float) - computed time shift for optimal alignment (x-units) time_lag (float) - computed time shift for optimal alignment (t-units) Inherited attributes: t, x (np.ndarray) - timeseries to be aligned t_ref, x_ref (np.ndarray) - reference to which timeseries is aligned window_size (int) - window size used for local smoothing lag (float) - computed time shift for optimal alignment score (float) - computed metric for quality of alignment data (pd.DataFrame) - aligned cells data """ def __init__(self, disc, reference_disc, channel='ch1_normalized', metric='crosscorrelation', window_size=None): """ Instantiate object for alignment of one Disc instance with another. Args: disc (data.discs.Disc) - aligned disc reference_disc (data.discs.Disc) - reference disc channel (str) - cells attribute used as alignment values metric (str) - name of alignment criterion window_size (int) - window size used for local smoothing (# cells) """ self.hours_per_pixel = disc.hours_per_pixel self.disc = deepcopy(disc) # select precursor dataframes precursor_data = disc.select_cell_type('pre').data reference_precursor_data = reference_disc.select_cell_type('pre').data # if no precursors were found, check for 'wpre' if len(precursor_data) == 0: precursor_data = disc.select_cell_type('wpre').data reference_precursor_data = reference_disc.select_cell_type('wpre').data # initialize alignment object (discs must be aligned on time) CellsAlignment.__init__(self, precursor_data, reference_data=reference_precursor_data, channel=channel, basis='t', metric=metric, window_size=window_size) # update time lags self.space_lag = self.lag / disc.hours_per_pixel self.time_lag = self.lag def __call__(self): """ Return aligned disc """ return self.get_aligned_disc()
[docs] @staticmethod def apply_to_disc(disc, space_lag, time_lag): """ Apply lag to copy of disc. Args: disc (data.discs.Disc) space_lag (float) - lag to be applied (x-units) time_lag (float) - lag to be applied (t-units) Returns: disc (data.discs.Disc) - copy of disc with lag applied """ disc = deepcopy(disc) disc.data.centroid_x += (space_lag - disc.data.centroid_x.min()) disc.data.t += (time_lag - disc.data.t.min()) return disc
[docs] def get_aligned_disc(self): """ Return aligned copy of disc. """ disc = deepcopy(self.disc) disc = self.apply_to_disc(disc, space_lag=self.space_lag, time_lag=self.time_lag) return disc
[docs]class ExperimentAlignment: """ Object for the time-alignment of all discs within an experiment. Attributes: experiment (data.experiments.Experiment) - copy of experiment scores (dict) - keys are disc IDs, values are quality of alignment """ def __init__(self, experiment, **kwargs): """ Instantiate object for the time-alignment of all discs within an experiment. The first disc serves as the reference. Args: experiment (data.experiments.Experiment) kwargs: keyword arguments for DiscAlignment instantiation """ self.experiment = deepcopy(experiment) self.align_discs(**kwargs) self.scores = {disc: alignment.score for disc, alignment in self.alignments.items()} @classmethod def _align_discs(cls, experiment, **kwargs): """ Align all discs with first disc. Args: experiment (data.experiments.Experiment) kwargs: keyword arguments for DiscAlignment instantiation Returns: experiment (data.experiments.Experiment) - copy of experiment alignments (dict) - {disc ID: Alignment} pairs """ reference_disc = experiment.discs[0] alignments = {} for disc_id in range(len(experiment.discs)): alignment = DiscAlignment(experiment.discs[disc_id], reference_disc, **kwargs) alignments[disc_id] = alignment experiment.discs[disc_id] = alignment.get_aligned_disc() return experiment, alignments
[docs] def align_discs(self, **kwargs): """ Align all discs with first disc. kwargs: keyword arguments for DiscAlignment instantiation """ experiment, alignments = self._align_discs(self.experiment, **kwargs) self.experiment = experiment self.alignments = alignments
[docs] def get_aligned_experiment(self): """ Return aligned experiment. """ return self.experiment
[docs]class MultiExperimentAlignment: """ Object for the time-alignment of multiple experiments. The first experiment serves as the reference. Attributes: experiments (list) - copies of Experiment instances """ def __init__(self, *experiments, **kwargs): """ Instantiate object for the time-alignment of multiple experiments. The first experiment serves as the reference. Args: experiments (iterable) - data.experiments.Experiment instances kwargs: keyword arguments for the alignment """ self.experiments = [deepcopy(experiment) for experiment in experiments] self.align_experiments(**kwargs) @classmethod def _align_experiments(cls, experiments, channel='ch1_normalized', **kwargs): """ Align all experiments with the first experiment. Args: experiments (iterable) - data.experiments.Experiment instances channel (str) - fluorescence channel by which to align experiments kwargs: keyword arguments for the alignment Returns: experiments (list) - list of copied Experiment instances """ reference = cls.aggregate_discs(experiments[0]).select_cell_type('pre') ref_initial_offset = reference.data.t.min() for experiment in experiments[1:]: aggregate_discs = cls.aggregate_discs(experiment).select_cell_type('pre') # shift to zero initial_offset = aggregate_discs.data.t.min() alignment = CellsAlignment(aggregate_discs.data, reference.data, channel=channel, basis='t', **kwargs) experiment.apply_lag(alignment.lag+ref_initial_offset-initial_offset) return experiments
[docs] def align_experiments(self, **kwargs): """ Align all experiments with first experiment. kwargs: keyword arguments for alignment """ experiments = self._align_experiments(self.experiments, **kwargs) self.experiments = experiments
[docs] @staticmethod def aggregate_discs(experiment): """ Aggregate all discs within an Experiment into single Cells object. Args: experiment (data.experiments.Experiment) Returns: cells (data.cells.Cells) """ return reduce(add, experiment.discs.values())
[docs] def get_aligned_experiments(self): """ Return aligned experiments. Returns: experiments (list) - list of copied Experiment instances """ return self.experiments