Source code for flyeye.data.experiments

from os.path import join
from glob import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import gridspec

from .discs import Disc
from .cells import Cells

from ..utilities.iteration import Iterator
from ..utilities.string_handling import format_channel
from ..processing.triangulation import Triangulation
from ..processing.alignment import DiscAlignment, ExperimentAlignment


[docs]class Experiment: """ Object representing multiple eye discs obtained under a single set of conditions. Attributes: discs (dict) - {disc ID: data.discs.Disc} pairs num_discs (int) - number of discs within experiment """ def __init__(self, dirpath, normalization, auto_alignment=True, align_by='ch1_normalized', **kwargs): """ Instantiate object representing all discs obtained under a single set of conditions. Args: dirpath (str) - path to directory containing silhouette files normalization (str or int) - normalization channel auto_alignment (bool) - if True, align discs align_by (str or int) - channel used to align discs kwargs: keyword arguments for disc instantiation """ self.discs = self.load(dirpath, normalization=normalization, **kwargs) # align discs if auto_alignment: self.align_discs(align_by) self.align_to_first_r8() def __getitem__(self, idx): """ Returns disc indexed by <idx>. """ return self.discs[idx] def __iter__(self): """ Iterate over discs. """ return Iterator(list(self.discs.values())) @property def num_discs(self): """ Number of discs in experiment. """ return len(self.discs) @property def num_progenitors(self): """ Number of progenitor measurements in experiment. """ return len(self.get_cells('pre').data)
[docs] @staticmethod def load(dirpath, normalization, **kwargs): """ Load discs from silhouette files. Args: dirpath (str) - path to directory containing silhouette files normalization (str or int) - normalization channel kwargs: keyword arguments for disc instantiation Returns: discs (dict) - {disc_id: data.discs.Disc} pairs """ # identify silhouette files silhouette_paths = sorted(glob(join(dirpath, '*.silhouette'))) # load discs discs = {} for i, path in enumerate(silhouette_paths): discs[i] = Disc.from_silhouette(path, normalization=normalization, **kwargs) return discs
[docs] def set_ratio(self, num, den): """ Add fluorescence ratio to each disc's dataframe, defined by <num>/<den> channels. """ for disc in self.discs.values(): disc.set_ratio(num, den)
[docs] def align_discs(self, channel): """ Align all discs within experiment. Args: channel (str) - expression channel by which discs are aligned """ channel = format_channel(channel) al = ExperimentAlignment(self, channel=channel) self.discs = al.get_aligned_experiment().discs
[docs] def get_pairwise_alignment(self, window_size=10, **kw): """ Compute pairwise quality of alignment between each disc. Args: window_size (int) - number of cells for smoothing kw: keyword arguments for DiscAlignment Returns: scores (np.ndarray) - mean quality of alignment for each disc """ # compute pairwise alignment between discs N = self.num_discs scores = np.zeros((N, N)) for i, d0 in self.discs.items(): for j, d1 in self.discs.items(): al = DiscAlignment(d0, d1, window_size=window_size, **kw) scores[i, j] = al.score # mask diagonal mask = np.ones(scores.shape, dtype=bool) np.fill_diagonal(mask, 0) return scores[mask].reshape(N, N-1).mean(axis=1)
[docs] def apply_lag(self, lag=0): """ Apply time shift to all discs in experiment. Args: lag (float) - time shift applied to each disc """ _ = [disc.apply_lag(offset=lag) for disc in self.discs.values()]
[docs] def align_to_first_r8(self, disc_id=0): """ Shift all discs s.t. t=0 is the first R8 in the reference disc. Args: disc_id (int) - index of disc used as reference """ # get time of first R8 reference = self.discs[disc_id] t = sorted(reference.select_cell_type('r8').data.t.values)[1] # apply lag self.apply_lag(lag=-t)
[docs] def get_cells(self, cell_type='pre', **selection_kw): """ Return Cells object for all specified cells. Args: cell_type (str or list) - type of cells to select selection_kw: keyword arguments for cell position selection Returns: cells (data.cells.Cells) """ # assign disc_id for disc_id, disc in self.discs.items(): disc.data['disc_id'] = disc_id # get all cells cells = np.sum(list(self.discs.values())) # filter cell selection cells = cells.select_cell_type(cell_type) cells = cells.select_by_position(**selection_kw) # sort inplace cells.sort(by='t') return cells
[docs] def select_by_concurrency(self, reference_types, N=10, lower_slip=0, upper_slip=0): """ Select cells concurrent with first N identified cells of reference cell type. Args: reference_types (array like) - reference cell type(s) N (int) - number of reference cells defining time window lower_slip (float) - extension before first reference cell, hours upper_slip (int) - reference cells skipped (excludes outliers) Returns: data (DataFrame) - cells concurrent with reference cell type """ # aggregate cells from just before/after their identification progenitors = Cells() references = Cells() for disc_id, disc in self.discs.items(): # select reference cells ref = disc.select_cell_type(reference_types) ref.data['disc_id'] = disc_id n_current = len(ref.data) if n_current == 0: continue # get time of first reference cell tmin = ref.data.iloc[upper_slip]['t'] - lower_slip # get time of Nth (or last) reference cell if n_current >= N: tmax = ref.data.iloc[N-1]['t'] else: tmax = ref.data.iloc[-1]['t'] # select concurrent progenitors and reference cells pre = disc.select_cell_type('pre') pre.data['disc_id'] = disc_id pre = pre.select_by_position(tmin=tmin, tmax=tmax) ref = ref.select_by_position(tmin=tmin, tmax=tmax) # append cell selections progenitors += pre references += ref # label precursors as multipotent progenitors.data['Population'] = 'Multipotent' progenitors.data['original_idx'] = progenitors.data.index # label neurons as differentiated references.data['Population'] = 'Differentiated' references.data['original_idx'] = references.data.index # label with corresponding reference cell type and append to data data = pd.concat((progenitors.data, references.data)) data['ReferenceType'] = '/'.join([n.upper() for n in reference_types]) return data
[docs] def get_early_neuron_data(self, N=10, lower_slip=0, upper_slip=1): """ Compile Dataframe of early R cells and concurrent progenitors. Args: N (int) - number of reference cells defining time window lower_slip (float) - extension before first reference cell, hours upper_slip (int) - reference cells skipped (excludes outliers) Returns: data (DataFrame) - measurement data for early R cells and concurrent progenitors """ cell_types = [['r8'], ['r2', 'r5'], ['r3', 'r4'], ['r1', 'r6'], ['r7']] data = pd.DataFrame() for types in cell_types: x = self.select_by_concurrency(types, N, lower_slip, upper_slip) data = pd.concat([data, x]) return data