Source code for flyqma.data.layers

from os.path import join, isdir, exists
from os import listdir, mkdir
from shutil import rmtree
import gc
from copy import deepcopy
import pandas as pd
import numpy as np
from scipy.ndimage import binary_erosion
import matplotlib.pyplot as plt
from matplotlib.path import Path
from collections import Counter

from ..visualization import *
from ..utilities import IO

# import measurement objects
from ..measurement import Segmentation
from ..measurement import Measurements

# import annotation objects
from ..annotation import WeightedGraph
from ..annotation import Annotation
from ..annotation import ConcurrencyLabeler
from ..annotation import CloneBoundaries
from ..annotation import CelltypeLabeler

# import bleedthrough correction objects
from ..bleedthrough import LayerCorrection

# import image base class
from .images import ImageMultichromatic
from .silhouette_write import WriteSilhouetteLayer

# import default parameters
from .defaults import Defaults
defaults = Defaults()


[docs]class LayerVisualization: """ Methods for visualizing a layer. """ @default_figure def plot_graph(self, channel, figsize=(15, 15), image_kw={}, graph_kw={}, ax=None): """ Plot graph on top of relevant image channel. Args: channel (str) - fluorescence channel to visualize figsize (tuple) - figure size image_kw (dict) - keyword arguments for scalar image visualization graph_kw (dict) - keyword arguments for scalar image visualization """ # add image if channel is not None: image = self.get_channel(channel) image.show(ax=ax, segments=False, **image_kw) # add graph self.graph.show(ax=ax, **graph_kw)
[docs] def plot_boundary(self, ax, label, label_by='genotype', color='r', alpha=70, **kwargs): """ Plot boundary of <label_by> groups with <label> on <ax>. """ # add labels to ephemeral copy of graph data graph = self.graph.copy() graph.data[label_by] = self.data[label_by] # plot clone boundaries bounds = CloneBoundaries(graph, label_by=label_by, alpha=alpha) bounds.plot_boundary(label, color=color, ax=ax, **kwargs)
[docs] def plot_boundaries(self, ax, label_by='genotype', cmap=plt.cm.bwr, alpha=70, **kwargs): """ Plot boundaries of all <label_by> groups on <ax>. """ # add labels to ephemeral copy of graph data graph = self.graph.copy() graph.data[label_by] = self.data[label_by] # plot clone boundaries bounds = CloneBoundaries(graph, label_by=label_by, alpha=alpha) bounds.plot_boundaries(cmap=cmap, ax=ax, **kwargs)
def _build_mask(self, values, interior_only=False, selection_only=False, null_value=-1): """ Use <values> to construct an image mask. Args: values (array like) - value/label for each segment interior_only (bool) - if True, excludes clone borders selection_only (bool) - if True, only include selected region null_value (int) - value used to fill unused pixels Returns: mask (np.ma.Maskedarray) - masked image in which foreground segments are replaced with the specified values """ # build dictionary mapping segments to values segment_to_value = dict(zip(self.data.segment_id, values)) segment_to_value[0] = null_value # exclude borders if interior_only: msg = 'Boundary attribute not found. Annotate and try again.' assert 'boundary' in self.data.keys(), msg boundary = self.data[self.data.boundary] boundary_to_black = {x: null_value for x in boundary.segment_id} segment_to_value.update(boundary_to_black) # exclude cells not included in selection if selection_only: excluded = self.data[~self.data.selected] excluded_to_black = {x: null_value for x in excluded.id} segment_to_value.update(excluded_to_black) # construct mask segment_to_value = np.vectorize(segment_to_value.get) mask = segment_to_value(self.labels) mask = np.ma.MaskedArray(mask, mask==null_value) return mask
[docs] def build_attribute_mask(self, attribute, interior_only=False, selection_only=False, **kwargs): """ Use <attribute> value for each segment to construct an image mask. Args: attribute (str) - attribute used to label each segment interior_only (bool) - if True, excludes clone borders selection_only (bool) - if True, only include selected region Returns: mask (np.ma.Maskedarray) - masked image in which foreground segments are replaced with the attribute values """ return self._build_mask(self.data[attribute].values, interior_only=interior_only, selection_only=selection_only, **kwargs)
[docs] def build_classifier_mask(self, classifier, interior_only=False, selection_only=False, **kwargs): """ Use segment <classifier> to construct an image mask. Args: classifier (annotation.Classifier object) interior_only (bool) - if True, excludes clone borders selection_only (bool) - if True, only include selected region Returns: mask (np.ma.Maskedarray) - masked image in which foreground segments are replaced with the assigned labels """ return self._build_mask(classifier(self.data), interior_only=interior_only, selection_only=selection_only, **kwargs)
[docs]class LayerIO(WriteSilhouetteLayer): """ Methods for saving and loading Layer objects and their subcomponents. """
[docs] def make_subdir(self, dirname): """ Make subdirectory. """ dirpath = join(self.path, dirname) if not exists(dirpath): mkdir(dirpath) self.add_subdir(dirname, dirpath)
[docs] def add_subdir(self, dirname, dirpath): """ Add subdirectory. """ self.subdirs[dirname] = dirpath
[docs] def find_subdirs(self): """ Find all subdirectories. """ self.subdirs = {} for dirname in listdir(self.path): dirpath = join(self.path, dirname) if isdir(dirpath): self.add_subdir(dirname, dirpath)
[docs] def save_metadata(self): """ Save metadata. """ io = IO() io.write_json(join(self.path, 'metadata.json'), self.metadata)
[docs] def save_segmentation(self, image, **kwargs): """ Save segment labels, and optionally save a segmentation image. Args: image (bool) - if True, save segmentation image kwargs: keyword arguments for image rendering """ dirpath = self.subdirs['segmentation'] # save segment labels np.save(join(dirpath, 'labels.npy'), self.labels) # save segmentation image if image: bg = self.get_channel(self.metadata['bg'], copy=False) fig = bg.show(segments=True) fig.axes[0].axis('off') fig.savefig(join(dirpath, 'segmentation.png'), **kwargs) fig.clf() plt.close(fig) gc.collect()
[docs] def save_measurements(self): """ Save raw measurements. """ # get segmentation directory path = join(self.subdirs['measurements'], 'measurements.hdf') # save raw measurements self.measurements.to_hdf(path, 'measurements', mode='w')
[docs] def save_processed_data(self): """ Save processed measurement data. """ path = join(self.subdirs['measurements'], 'processed.hdf') self.data.to_hdf(path, 'data', mode='w')
[docs] def save_annotator(self, image=False, **kwargs): """ Save annotator instance. Args: image (bool) - if True, save annotation images kwargs: keyword arguments for image rendering """ path = self.subdirs['annotation'] self.annotator.save(path, image=image, **kwargs)
[docs] def save(self, segmentation=True, measurements=True, processed_data=True, annotator=False, segmentation_image=False, annotation_image=False): """ Save segmentation parameters and results. Args: segmentation (bool) - if True, save segmentation measurements (bool) - if True, save measurement data processed_data (bool) - if True, save processed measurement data annotator (bool) - if True, save annotator segmentation_image (bool) - if True, save segmentation image annotation_image (bool) - if True, save annotation image """ # set image keyword arguments image_kw = dict(format='png', dpi=100, bbox_inches='tight', pad_inches=0, transparent=True, rasterized=True) # save segmentation if segmentation: self.make_subdir('segmentation') self.save_segmentation(image=segmentation_image, **image_kw) # save measurements if measurements: self.make_subdir('measurements') self.save_measurements() # save processed data if processed_data and self.data is not None: self.data = self.process_measurements(self.measurements) self.save_processed_data() # save annotation if annotator and self.annotator is not None: self.make_subdir('annotation') self.save_annotator(image=annotation_image, **image_kw) # save metadata self.save_metadata()
[docs] def load_metadata(self): """ Load metadata. """ path = join(self.path, 'metadata.json') if exists(path): io = IO() self.metadata = io.read_json(path)
[docs] def load_labels(self): """ Load segment labels if they are available. """ labels = None if 'segmentation' in self.subdirs.keys(): segmentation_path = self.subdirs['segmentation'] labels_path = join(segmentation_path, 'labels.npy') if exists(labels_path): labels = np.load(labels_path) self.labels = labels
[docs] def load_measurements(self): """ Load raw measurements. """ path = join(self.subdirs['measurements'], 'measurements.hdf') self.measurements = pd.read_hdf(path, 'measurements')
[docs] def load_processed_data(self): """ Load processed data from file. """ path = join(self.subdirs['measurements'], 'processed.hdf') self.data = pd.read_hdf(path, 'data')
[docs] def load_annotator(self): """ Load annotator instance. """ self.annotator = Annotation.load(self.subdirs['annotation'])
[docs] def load_inclusion(self): """ Load inclusion flag. """ io = IO() selection_md = io.read_json(join(self.subdirs['selection'], 'md.json')) if selection_md is not None: self.include = bool(selection_md['include'])
[docs] def load_correction(self): """ Load linear background correction. Returns: correction (LayerCorrection) """ assert self.has_image, 'Image unavailable. Load image and try again.' return LayerCorrection.load(self)
[docs] def load(self, use_cache=True, graph=True): """ Load layer. Args: use_cache (bool) - if True, use cached measurement data, otherwise re-process the measurement data graph (bool) - if True, load weighted graph """ # load metadata and extract background channel self.load_metadata() # load inclusion data if 'selection' in self.subdirs.keys(): self.load_inclusion() # if layer is not included, skip it if not self.include: return None # check whether annotation exists if 'annotation' in self.subdirs.keys() and not use_cache: if self.annotator is not None: raise UserWarning('Layer was instantiated with a stack-level annotation instance, but a second annotation instance was found within the layer directory. Resolve this conflict before continuing.') # load annotator self.load_annotator() # check whether segmentation exists and load raw measurement data if 'measurements' in self.subdirs.keys(): self.load_measurements() # if processing measurements, ensure that graph is built if not use_cache: graph = True # build graph if graph and 'graph_weighted_by' in self.metadata['params'].keys(): graph_weighted_by = self.metadata['params']['graph_weighted_by'] graph_kw = self.metadata['params']['graph_kw'] self.build_graph(graph_weighted_by, **graph_kw) else: self.graph = None # check whether cached measurements are available if 'measurements' in self.subdirs.keys(): path = join(self.subdirs['measurements'], 'processed.hdf') # load processed data if use_cache and exists(path): self.load_processed_data() # otherwise, process raw measurement data else: self.data = self.process_measurements(self.measurements)
[docs]class LayerProperties: """ Properties for Layer class: color_depth (int) - number of fluorescence channels num_cells (int) - number of cells detected by segmentation bg_key (str) - key for channel used to generate segmentation has_image (bool) - if True, image is loaded into memory is_segmented (bool) - if True, layer has been segmented has_trained_annotator (bool) - if True, layer has a trained annotator """ @property def color_depth(self): """ Number of color channels. """ return self.im.shape[-1] @property def num_cells(self): """ Number of cells detected by segmentation. """ return len(self.data) if self.data is not None else None @property def bg_key(self): """ DataFrame key for background channel. """ return self._to_key(self.metadata['bg']) @property def has_image(self): """ True if image is available. """ return self.im is not None @property def is_segmented(self): """ True if measurement data are available. """ return self.measurements is not None @property def has_trained_annotator(self): """ Returns True if trained annotator is available. """ return self.annotator is not None
[docs]class LayerMeasurement: """ Measurement related methods for Layer class. """
[docs] def segment(self, channel, preprocessing_kws={}, seed_kws={}, seg_kws={}, min_area=250): """ Identify nuclear contours by running watershed segmentation on specified background channel. Args: channel (int) - channel index on which to segment image preprocessing_kws (dict) - keyword arguments for image preprocessing seed_kws (dict) - keyword arguments for seed detection seg_kws (dict) - keyword arguments for segmentation min_area (int) - threshold for minimum segment size, px Returns: background (ImageScalar) - background image (after processing) """ # append default parameter values preprocessing_kws = defaults('preprocessing', preprocessing_kws) seed_kws = defaults('seeds', seed_kws) seg_kws = defaults('segmentation', seg_kws) # store parameters in metadata self.metadata['bg'] = channel segmentation_kw = dict(preprocessing_kws=preprocessing_kws, seed_kws=seed_kws, seg_kws=seg_kws, min_area=min_area, imported=False) self.metadata['params']['segmentation_kw'] = segmentation_kw # extract and preprocess background background = self.get_channel(channel) background.preprocess(**preprocessing_kws) # run segmentation seg = Segmentation(background, seed_kws=seed_kws, seg_kws=seg_kws) # exclude small segments seg.exclude_small_segments(min_area=min_area) # update segment labels self.labels = seg.labels background.labels = seg.labels # update cell measurements self.measure() return background
[docs] def measure(self): """ Measure properties of cell segments. Raw measurements are stored under in the 'measurements' attribute, while processed measurements are stored in the 'data' attribute. """ # measure segment properties measurements = Measurements(self.im, self.labels) measurements = measurements.build_dataframe() # assign layer id, apply normalization, and save measurements measurements['layer'] = self._id self.apply_normalization(measurements) self.measurements = measurements # process raw measurement data self.data = self.process_measurements(measurements)
[docs] def apply_normalization(self, data): """ Normalize fluorescence intensity measurements by measured background channel intensity. Args: data (pd.DataFrame) - processed cell measurement data """ # get background channel from metadata bg = self.metadata['bg'] # apply normalization to each foreground channel for fg in range(self.color_depth): if fg == bg: continue fg_key = self._to_key(fg) data['{:s}_normalized'.format(fg_key)] = data[fg_key]/data[self.bg_key]
[docs] def import_segmentation_mask(self, path, channel, save=True, save_image=True): """ Import external segmentation mask and use it to generate measurements. Provided mask must contain a 2-D array of positive integers in which a values of zero denotes the image background. Args: path (str) - path to segmentation mask channel (int) - fluorescence channel used for segmentation save (bool) - if True, copy segmentation to stack directory save_image (bool) - if True, save segmentation image """ assert exists(path), 'File does not exist.' io = IO() mask = io.read_npy(path) int_types = (int, np.int32, np.int64) assert mask.dtype in int_types, 'Mask does not contain integers.' assert mask.shape == self.shape, 'Mask dimensions are incorrect.' assert mask.min() >= 0, 'Mask contains values less than zero.' # set segmentation mask and generate measurements self.labels = mask self.metadata['bg'] = channel self.measure() # optionally copy mask to stack directory if save: self.metadata['params']['segmentation_kw']=dict(imported=True) self.save_metadata() self.make_subdir('segmentation') self.save_segmentation(save_image) self.make_subdir('measurements') self.save_measurements()
[docs]class LayerROI: """ ROI related methods for Layer class. """ @staticmethod def _apply_roi_vertices(data, xykey, roi_vertices): """ Label cells within a specified region of interest. Args: data (pd.DataFrame) - cell measurement data roi_vertices (np.ndarray[int], N x 2) - vertices bounding ROI """ # add selected attribute to cell measurement data data['selected'] = False # construct matplotlib path object path = Path(roi_vertices, closed=False) # mark cells as within or outside the selection boundary xy_positions = data[xykey].values data['selected'] = path.contains_points(xy_positions)
[docs] @staticmethod def sort_clockwise(xycoords): """ Returns clockwise-sorted xy coordinates. """ return xycoords[:, np.argsort(np.arctan2(*(xycoords.T - xycoords.mean(axis=1)).T))]
[docs] @classmethod def mask_to_vertices(cls, mask): """ Convert boolean mask to a list of vertices defining the border around the largest contiguous region. Args: mask (np.ndarray[bool]) - ROI mask, where True denotes the region. Note that the mask may only contain one contiguous component. Returns: vertices (np.ndarray[int]) - N x 2 array of vertices """ borders = (mask != binary_erosion(mask, structure=np.ones((3, 3)))) vertices = cls.sort_clockwise(np.asarray(borders.nonzero())) return vertices.T
[docs] def import_roi_mask(self, path, save=True): """ Import external ROI mask and use it to label measurement data. Provided mask must contain a 2-D boolean array with the same dimensions as the raw image. True values denote the ROI. The mask may only contain a single contiguous ROI. Args: path (str) - path to ROI mask save (bool) - if True, copy ROI mask to stack directory """ assert exists(path), 'File does not exist.' # read mask and make sure it's valid io = IO() mask = io.read_npy(path) assert mask.min()>=0 and mask.max()<=1, 'Mask is not boolean.' assert mask.shape == self.shape, 'Mask dimensions are incorrect.' mask = mask.astype(bool) # convert mask to vertices and apply to measurement data vertices = self.mask_to_vertices(mask) self._apply_roi_vertices(self.data, self.xykey, vertices) # save ROI mask to stack directory if save: self.make_subdir('selection') selection_path = self.subdirs['selection'] io = IO() io.write_npy(join(selection_path, 'selection.npy'), vertices) md = dict(include=True) io.write_json(join(selection_path, 'md.json'), md) # update measurements self.save_processed_data()
[docs] def define_roi(self, data): """ Adds a "selected" attribute to measurements dataframe. The attribute is True for cells that fall within the ROI. Args: data (pd.DataFrame) - processed measurement data """ if self.include: # load ROI vertices io = IO() path = join(self.subdirs['selection'],'selection.npy') roi_vertices = io.read_npy(path) # apply mask self._apply_roi_vertices(data, self.xykey, roi_vertices) else: data['selected'] = False
[docs]class LayerCorrection: """ Bleedthrough correction related methods for Layer class. """
[docs] def apply_correction(self, data): """ Adds bleedthrough-corrected fluorescence levels to the measurements dataframe. Args: data (pd.DataFrame) - processed cell measurement data """ # load correction coefficients and X/Y variables io = IO() cdata = io.read_json(join(self.subdirs['correction'], 'data.json')) # get independent/dependent variables xvar = cdata['params']['xvar'] yvar = cdata['params']['yvar'] bgvar = self.metadata['bg'] if type(xvar) == int: xvar = 'ch{:d}'.format(xvar) if type(yvar) == int: yvar = 'ch{:d}'.format(yvar) if type(bgvar) == int: bgvar = 'ch{:d}'.format(bgvar) # get linear model coefficients b, m = cdata['coefficients'] # apply correction trend = b + m * data[xvar].values data[yvar+'_predicted'] = trend data[yvar+'c'] = data[yvar] - trend data[yvar+'c_normalized'] = data[yvar+'c'] / data[bgvar]
[docs]class LayerAnnotation: """ Annotation related methods for Layer class. """
[docs] def annotate(self): """ Annotate measurement data in place, also labeling boundaries between labeled regions and marking regions in which each label occurs. """ # make sure graph is available msg = 'Graph not found. Call the .build_graph() method then try again.' assert self.graph is not None, msg # make sure annotator is available msg = 'Trained annotator not found. Call the .train_annotator() method then try again.' assert self.has_trained_annotator, msg # apply trained annotator to label distinct celltypes self._apply_annotation(self.data) # mark boundaries between labeled regions self._mark_boundaries(self.data, basis='genotype', max_edges=1) # mark regions in which each label is found self._apply_concurrency(self.data, basis='genotype')
[docs] def train_annotator(self, attribute, save=False, logratio=True, num_labels=3, **kwargs): """ Train an Annotation model on the measurements in this layer. Args: attribute (str) - measured attribute used to determine labels save (bool) - if True, save model selection routine logratio (bool) - if True, weight edges by relative attribute value num_labels (int) - number of allowable unique labels kwargs: keyword arguments for Annotation, including: sampler_type (str) - either 'radial', 'neighbors', 'community' sampler_kwargs (dict) - keyword arguments for sampler min_num_components (int) - minimum number of mixture components max_num_components (int) - maximum number of mixture components addtl_kwargs: keyword arguments for Classifier Returns: selector (ModelSelection object) """ # instantiate annotator self.annotator = Annotation(attribute, num_labels=num_labels, **kwargs) # build graph and use it to train annotator self.build_graph(attribute, logratio=logratio) selector = self.annotator.train(self.graph) # save trained annotator if save: self.save_metadata() self.make_subdir('annotation') selector.save(self.subdirs['annotation']) return selector
def _apply_annotation(self, data, label='genotype', **kwargs): """ Assign labels to cell measurements. Args: data (pd.DataFrame) - processed cell measurement data label (str) - attribute name for predicted genotype kwargs: keyword arguments for Annotator.annotate() """ data[label] = self.annotator(self.graph, **kwargs)
[docs] def apply_annotation(self, label='genotype', **kwargs): """ Assign labels to cell measurements in place. Args: label (str) - attribute name for predicted genotype kwargs: keyword arguments for Annotator.annotate() """ self._apply_annotation(self.data, label=label, **kwargs)
@staticmethod def _apply_concurrency(data, basis='genotype', min_pop=5, max_distance=10, **kwargs): """ Add boolean 'concurrent_<basis>' field to measurement data for each unique value of <basis> attribute. Args: data (pd.DataFrame) - processed cell measurement data basis (str) - attribute on which concurrency is established min_pop (int) - minimum population size for inclusion of cell type max_distance (float) - maximum distance threshold for inclusion kwargs: keyword arguments for ConcurrencyLabeler """ assert basis in data.columns, 'Attribute {:s} not found.'.format(basis) labeler = ConcurrencyLabeler(attribute=basis, min_pop=min_pop, max_distance=max_distance, **kwargs) labeler(data)
[docs] def apply_concurrency(self, basis='genotype', min_pop=5, max_distance=10, **kwargs): """ Add boolean 'concurrent_<basis>' field to measurement data for each unique value of <basis> attribute. Args: basis (str) - attribute on which concurrency is established min_pop (int) - minimum population size for inclusion of cell type max_distance (float) - maximum distance threshold for inclusion kwargs: keyword arguments for ConcurrencyLabeler """ self._apply_concurrency(self.data, basis=basis, min_pop=min_pop, max_distance=max_distance, **kwargs)
def _mark_boundaries(self, data, basis='genotype', max_edges=0): """ Mark boundaries between cells with disparate labels by assigning a boundary label to all cells that share an edge with another cell with a different label. Args: data (pd.DataFrame) - processed cell measurement data basis (str) - attribute used to define label max_edges (int) - maximum number of edges for interior cells """ # make sure graph is available msg = 'Graph not found, call .build_graph() method then try again.' assert self.graph is not None, msg # make sure basis attribute is available msg = 'Attribute {:s} not found in measurement data.'.format(basis) assert basis in data.columns, msg # assign genotype to edges assign_genotype = np.vectorize(dict(data[basis]).get) edge_genotypes = assign_genotype(self.graph.edges) # find edges traversing clones boundaries = (edge_genotypes[:, 0] != edge_genotypes[:, 1]) # get number of clone-traversing edges per node boundary_edges = self.graph.edges[boundaries] edge_counts = Counter(boundary_edges.ravel()) # assign boundary label to nodes with too many clone-traversing edges boundary_nodes = [n for n, c in edge_counts.items() if c>max_edges] data['boundary'] = False data.loc[boundary_nodes, 'boundary'] = True
[docs] def mark_boundaries(self, basis='genotype', max_edges=0): """ Mark boundaries between cells with disparate labels by assigning a boundary label to all cells that share an edge with another cell with a different label. Args: basis (str) - attribute used to define label max_edges (int) - maximum number of edges for interior cells """ self._mark_boundaries(self.data, basis=basis, max_edges=max_edges)
[docs] def show_annotation(self, channel, label, interior_only=False, selection_only=False, cmap=None, figsize=(8, 4), **kwargs): """ Visualize annotation by overlaying <label> attribute on the image of the specified fluoreascence <channel>. Args: channel (str) - fluorescence channel to visualize label (str) - attribute containing cell type labels interior_only (bool) - if True, exclude border regions selection_only (bool) - if True, only add contours within ROI cmap (matplotlib.ListedColorMap) - color scheme for celltype labels figsize (tuple) - figure dimensions kwargs: keyword arguments for plt.scatter Returns: fig (matplotlib.Figure) """ assert label in self.data.keys(), 'No {:s} attribute found. Please check to make sure that annotation is complete.'.format(label) # create figure and plot images fig, (ax0, ax1) = plt.subplots(ncols=2, figsize=figsize) _ = self.get_channel(channel).show(segments=False, ax=ax0) _ = self.get_channel(channel).show(segments=False, ax=ax1) # build and overlay attribute mask mask = self.build_attribute_mask(label, interior_only=interior_only, selection_only=selection_only) ax1.imshow(mask, cmap=cmap) # rectify dimensions ax1.set_xlim(*ax0.get_xlim()) ax1.set_ylim(*ax0.get_ylim()) plt.tight_layout() return fig
[docs]class Layer(LayerIO, ImageMultichromatic, LayerVisualization, LayerProperties, LayerMeasurement, LayerROI, LayerCorrection, LayerAnnotation): """ Object represents a single imaged layer. Attributes: measurements (pd.DataFrame) - raw cell measurement data data (pd.DataFrame) - processed cell measurement data path (str) - path to layer directory _id (int) - layer ID, must be an integer value subdirs (dict) - {name: path} pairs for all subdirectories metadata (dict) - layer metadata labels (np.ndarray[int]) - segment ID mask annotator (Annotation) - object that assigns labels to measurements graph (Graph) - graph connecting cell centroids include (bool) - if True, layer was manually marked for inclusion Inherited attributes: im (np.ndarray[float]) - 3D array of pixel values shape (array like) - image dimensions mask (np.ndarray[bool]) - image mask labels (np.ndarray[int]) - segment ID mask Properties: color_depth (int) - number of fluorescence channels num_cells (int) - number of cells detected by segmentation bg_key (str) - key for channel used to generate segmentation is_segmented (bool) - if True, layer has been segmented has_trained_annotator (bool) - if True, layer has a trained annotator """ def __init__(self, path, im=None, annotator=None): """ Instantiate layer. Args: path (str) - path to layer directory im (np.ndarray[float]) - 3D array of pixel values annotator (Annotation) - object that assigns labels to measurements """ # set layer ID layer_id = int(path.rsplit('/', maxsplit=1)[-1]) self._id = layer_id self.xykey = ['centroid_x', 'centroid_y'] # set path and subdirectories self.path = path # make layers directory if not exists(self.path): self.initialize() self.find_subdirs() # load inclusion; defaults to True if 'selection' in self.subdirs.keys(): if len(listdir(self.subdirs['selection'])) == 0: self.include = True else: self.load_inclusion() else: self.include = True # initialize measurement data self.measurements = None self.data = None # set annotator self.annotator = annotator # load labels and instantiate image self.load_labels() super().__init__(im, labels=self.labels)
[docs] def initialize(self): """ Initialize layer directory by: - Creating a layer directory - Removing existing segmentation directory - Saving metadata to file """ # make layers directory if not exists(self.path): mkdir(self.path) self.subdirs = {} # remove existing segmentation/annotation/measurement directories for key in ('segmentation', 'measurements', 'annotation'): if key in self.subdirs.keys(): rmtree(self.subdirs[key]) # make metadata file segmentation_kw = dict(preprocessing_kws={}, seed_kws={}, seg_kws={}) params = dict(segmentation_kw=segmentation_kw, graph_kw={}) metadata = dict(bg=None, params=params) # save metadata IO().write_json(join(self.path, 'metadata.json'), metadata)
[docs] def process_measurements(self, measurements): """ Augment measurements by: 1. incorporating manual selection boundary 2. correcting for fluorescence bleedthrough 3. assigning measurement labels 4. marking clone boundaries 5. assigning label concurrency information Operations 3-5 require construction of a WeightedGraph object. Args: measurements (pd.DataFrame) - raw measurement data Returns: data (pd.DataFrame) - processed measurement data """ # copy raw measurements data = deepcopy(measurements) # load and apply selection if 'selection' in self.subdirs.keys(): self.define_roi(data) # load and apply correction if 'correction' in self.subdirs.keys(): self.apply_correction(data) # annotate measurements if self.has_trained_annotator and self.graph is not None: # apply trained annotator to label distinct celltypes self._apply_annotation(data, label='genotype') # mark boundaries between labeled regions self._mark_boundaries(data, basis='genotype', max_edges=1) # mark regions in which each label is found self._apply_concurrency(data, basis='genotype') return data
[docs] def build_graph(self, weighted_by, **graph_kw): """ Compile weighted graph connecting adjacent cells. Args: weighted_by (str) - attribute used to weight edges graph_kw: keyword arguments, including: xykey (list) - attribute keys for node x/y positions logratio (bool) - if True, weight edges by log ratio distance (bool) - if True, weights edges by distance """ # store metadata for graph reconstruction self.metadata['params']['graph_weighted_by'] = weighted_by self.metadata['params']['graph_kw'] = graph_kw # build graph self.graph = WeightedGraph(self.measurements, weighted_by, **graph_kw)