import warnings
import numpy as np
import pandas as pd
from scipy.stats import norm
from .silhouette import SilhouetteData
from .image import ImageStack
from .cells import Cells
from ..utilities.string_handling import format_channel
from ..dynamics.averages import detrend_signal
from ..processing.triangulation import Triangulation
# treat warnings as exceptions
#import warnings
#warnings.filterwarnings('error')
[docs]class DiscProperties:
""" Properties for Disc class. """
@property
def path(self):
""" Path to silhouette file. """
return self.silhouette.path
@property
def is_flipped_about_yz(self):
""" If True, disc is inverted about YZ plane. """
return self.silhouette.is_flipped_about_yz
@property
def is_flipped_about_xy(self):
""" If True, disc is inverted about XY plane. """
return self.silhouette.is_flipped_about_xy
[docs]class Disc(Cells, DiscProperties):
"""
Object representing all cells in a single eye disc.
Attributes:
silhouette (flyeye.SilhouetteData) - data from silhouette file
furrow_velocity (float) - furrow inverse-velocity (hours per column)
offset (float) - time by which disc is shifted from first R8
bit_depth (int) - fluorescence intensity bit depth, log2 scaled
triangulation (processing.triangulation.Triangulation)
Inherited Attributes:
normalization (str or int) - channel used to normalize intensities
data (pd.DataFrame) - cell measurement data
Properties:
path (str) - unique silhouette filepath
is_flipped_about_yz (bool) - if True, disc is inverted about YZ plane
is_flipped_about_xy (bool) - if True, disc is inverted about XY plane
"""
def __init__(self,
silhouette,
normalization=None,
furrow_velocity=2.,
offset=None,
bit_depth=None):
"""
Instantiate object representing all cells in a single eye disc.
Args:
silhouette (flyeye.SilhouetteData) - data from silhouette file
normalization (str or int) - channel used to normalize intensities
furrow_velocity (float) - furrow inverse-velocity (hours per column)
offset (float) - time by which disc is shifted from first R8
bit_depth (int) - fluorescence intensity bit depth, log2 scaled
"""
Cells.__init__(self, silhouette.data, normalization=normalization)
# set sihouette attribute
self.silhouette = silhouette
# standardize precursor labels
self.standardize_labels()
# orient disc (flip horizontally)
if self.is_flipped_about_yz is True:
self.flip_about_yz()
self.sort()
# flip stack vertically
if self.is_flipped_about_xy is True:
self.flip_about_xy()
# normalize fluorescence intensities by bit depth
if bit_depth is not None:
self.bit_depth = bit_depth
self.normalize_expression(base=2**self.bit_depth)
# normalize fluorescence intensities by reference channel
if self.normalization is not None:
self.normalize_by_reference(self.normalization)
# construct triangulation of R8 positions
self.furrow_velocity = furrow_velocity
self.triangulation = None
r8_neurons = self.select_cell_type('r8')
if len(r8_neurons.data) > 0:
self.triangulation = Triangulation(self, threshold=1.75, furrow_velocity=furrow_velocity)
# apply triangulation to compute estimated developmental times
self.apply_time_scaling()
# shift all cells such that first R8 occurs at time zero
self.apply_lag(offset=offset)
# detrend
self.detrend(channels=self.normalized_channels)
[docs] @staticmethod
def from_silhouette(path,
normalization=None,
furrow_velocity=2.,
recompile=False,
**kwargs):
"""
Instantiate disc from silhouette file.
Args:
path (str) - silhouette filepath
normalization (str or int) - normalization channel
furrow_velocity (float) - furrow inverse-velocity (hours per column)
recompile (bool) - if True, recompile measurements from all layers
kwargs: keyword arguments for disc instantiation
Returns:
disc (data.discs.Disc)
"""
# load silhouette file data
silhouette = SilhouetteData(path, recompile=recompile)
# instantiate disc
disc = Disc(silhouette,
normalization=normalization,
furrow_velocity=furrow_velocity,
**kwargs)
return disc
[docs] def load_imagestack(self):
"""
Load image stack from silhouette file.
Returns:
stack (data.image.ImageStack)
"""
return ImageStack.from_silhouette(self.path)
[docs] def standardize_labels(self):
""" Convert all alternate precursor labels to 'pre' """
names = ['p', 'prepre', 'v']
self.data.loc[self.data.label.isin(names), 'label'] = 'pre'
[docs] def flip_about_xy(self, save=False):
"""
Flip disc bottom to top.
Args:
save (bool) - if True, save new orientation to silhouette file
"""
ymax, ymin = self.data.centroid_y.max(), self.data.centroid_y.min()
self.data['centroid_y'] = ymax - self.data.centroid_y + ymin
# update silhouette file
if save:
self.silhouette.flip_about_xy()
[docs] def flip_about_yz(self, save=False):
"""
Flip disc left to right.
Args:
save (bool) - if True, save new orientation to silhouette file
"""
xmax, xmin = self.data.centroid_x.max(), self.data.centroid_x.min()
self.data['centroid_x'] = xmax - self.data.centroid_x + xmin
if 't' in self.data.keys():
tmax, tmin = self.data.t.max(), self.data.t.min()
self.data['t'] = tmax - self.data.t + tmin
# update silhouette file
if save:
self.silhouette.flip_about_yz()
[docs] def apply_time_scaling(self):
""" Apply distance-to-time scaling to generate time vector. """
if self.triangulation is None:
hours_per_pixel = 1
else:
hours_per_pixel = self.triangulation.hours_per_pixel
self.hours_per_pixel = hours_per_pixel
self.data['t'] = self.data['centroid_x'] * self.hours_per_pixel
[docs] def apply_lag(self, offset=None):
"""
Shift disc in time.
Args:
offset (str or float) - position of new origin, if 'first_r8' shift such that first R8 occurs at time zero
"""
if offset is None:
offset = 'first_r8'
if offset == 'first_r8':
r8_neurons = self.select_cell_type('r8')
if len(r8_neurons.data) > 0:
offset = -r8_neurons.data.t.min()
else:
offset = 0
self.data['t'] += offset
self.data['centroid_x'] += offset / self.hours_per_pixel
[docs] def normalize_expression(self, max_value):
"""
Convert each channel's fluorescence to a 0-1 scale.
Args:
max_value (float) - maximum fluorescence intensity
"""
for channel in self.channels:
self.data[channel] /= max_value
self.data[channel+'_std'] /= max_value
[docs] def normalize_by_reference(self, reference):
"""
Normalize expression levels by the reference channel.
Args:
reference (str or int) - channel used for normalization
"""
for channel in self.channels:
if channel != reference:
channel_name = channel+'_normalized'
self.data[channel_name] = self.data[channel]/self.data[reference]
[docs] def set_ratio(self, num, den):
"""
Add fluorescence ratio to dataframe, defined by <num>/<den> channels.
"""
num, den = format_channel(num), format_channel(den)
if self.data[den].max() != 0:
self.data['logratio'] = np.log2(self.data[num]/self.data[den])
self.data['ratio'] = self.data[num]/self.data[den]
self.detrend(channels=('logratio',))
else:
raise ValueError('Denominator channel is empty.')
[docs] def detrend(self, channels, order=1):
"""
Add detrended fluctuations for each fluorescence channel within each cell type to disc dataframe.
Args:
channels (iterable) - channels to be detrended
order (int) - polyorder for local trendataitting
"""
fluctuations = []
for cell_type in self.data.label.unique():
# select cells of specified type
cells = self.select_cell_type(cell_type)
data, columns = [], []
for ch in channels:
# skip missing channels
if ch not in self.data.columns:
continue
# get samples for current cell type and channel
x = cells.data[ch].values
# determine window size for lowpass filter
if cell_type == 'pre':
window_size = np.floor(x.size/10)
else:
window_size = np.floor(x.size/5)
# evaluate residuals by substracting lowpass filtered data
if window_size > 3:
residuals, trend = detrend_signal(x, window_size, order)
else:
trend = x.mean() * np.ones(x.size)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
residuals = x - trend
# normalize residuals
if ch == 'logratio':
flux_raw = residuals
else:
flux_raw = residuals/trend
flux = np.abs(flux_raw)
# append channel to list
data.extend([trend, residuals, flux_raw, flux])
columns.extend([ch+'_trend',
ch+'_residuals',
ch+'_flux_raw',
ch+'_flux'])
# append cells fluctuations dataframe to list
data = np.array(data).T
data = data.reshape((len(cells.data.index), len(columns)))
data = pd.DataFrame(data=data, index=cells.data.index, columns=columns)
fluctuations.append(data)
# join measurements with residuals on measurement index
self.data = self.data.join(pd.concat(fluctuations))
[docs] def get_multipotent_layers(self, q=.9):
"""
Determine which layers span multipotent cells. Bounds correspond to lower and upper quantiles taken from a normal fit to all progenitor layers.
Args:
q (float) - width parameter (interquantile range), 0 to 1
Returns:
bounds (np.ndarray, length 2) - lower and upper bounds
"""
progenitors = self.select_cell_type('pre')
layers = progenitors.data.layer.values
bounds = norm(*norm.fit(layers)).ppf([(1-q)/2, (1+q)/2])
return np.round(bounds).astype(int)