Source code for flyqma.data.experiments
from os.path import join, abspath, isdir
from glob import glob
import numpy as np
import pandas as pd
from ..utilities import UserPrompts
from .stacks import Stack
[docs]class Experiment:
"""
Object represents a collection of 3D RGB image stacks collected under the same experimental conditions.
Attributes:
path (str) - path to experiment directory
_id (str) - name of experiment
stack_ids (list of str) - unique stack ids within experiment
stack_dirs (dict) - {stack_id: stack_directory} tuples
count (int) - counter for stack iteration
"""
def __init__(self, path):
"""
Instantiate experiment object.
Args:
path (str) - directory with subdirectories of 3D RGB image stacks
"""
# set path to experiment directory
self.path = abspath(path)
# set experiment ID
self._id = path.split('/')[-1]
# set stack paths
stack_paths = [p for p in glob(join(self.path, '*')) if isdir(p)]
get_stack_id = lambda x: x.rsplit('/', maxsplit=1)[-1]
self.stack_dirs = {get_stack_id(p): p for p in stack_paths}
self.stack_ids = sorted(self.stack_dirs.keys())
# reset stack iterator count
self.count = 0
# check if stacks have been initialized, if not prompt user
if not self.is_initialized:
self.prompt_initialization()
def __getitem__(self, stack_id):
""" Load stack. """
return self.load_stack(stack_id, full=False)
def __iter__(self):
""" Iterate across stacks. """
self.count = 0
return self
def __next__(self):
""" Return next stack. """
if self.count < len(self.stack_ids):
stack_id = self.stack_ids[self.count]
stack = self.__getitem__(stack_id)
self.count += 1
return stack
else:
raise StopIteration
@property
def is_initialized(self):
""" Returns True if Experiment has been initialized. """
for stack_dir in self.stack_dirs.values():
if not Stack._check_if_initialized(stack_dir):
return False
return True
[docs] def prompt_initialization(self):
""" Ask user whether to initialize all stack directories. """
msg = 'Incomplete stack directories found. Initialize them?'
user_response = UserPrompts.boolean_prompt(msg)
if user_response:
msg = 'Please enter an image bit depth:'
bit_depth = UserPrompts.integer_prompt(msg)
if bit_depth is not None:
self.initialize(bit_depth)
else:
raise ValueError('User response not recognized, stacks have not been initialized.')
[docs] def initialize(self, bit_depth):
"""
Initialize a collection of image stacks.
Args:
bit_depth (int) - bit depth of raw tif (e.g. 12 or 16). Value will be read from the stack metadata if None is provided. An error is raised if no value is found.
"""
for stack_id in self.stack_ids:
_ = self.load_stack(stack_id, full=False, bit_depth=bit_depth)
[docs] def load_stack(self, stack_id, full=False, **kwargs):
"""
Load 3D RGB image stack.
Args:
stack_id (str or int) - desired stack
full (bool) - if True, load full 3D image from tif file
Returns:
stack (Stack)
"""
stack = Stack(self.stack_dirs[str(stack_id)], **kwargs)
if full:
stack.load_image()
return stack
[docs] def aggregate_measurements(self,
selected_only=False,
exclude_boundary=False,
raw=False,
use_cache=True):
"""
Aggregate measurements from each stack.
Args:
selected_only (bool) - if True, exclude cells outside the ROI
exclude_boundary (bool) - if True, exclude cells on the border of labeled regions
raw (bool) - if True, use raw measurements from included discs
use_cache (bool) - if True, used available cached measurement data
Returns:
data (pd.Dataframe) - curated cell measurement data, which is None if no measurement data are found
"""
# load measurements from each stack in the experiment
data = []
for stack_id in self.stack_ids:
stack = self.load_stack(stack_id, full=False)
measurements = stack.aggregate_measurements(
selected_only=selected_only,
exclude_boundary=exclude_boundary,
raw=raw,
use_cache=use_cache)
if measurements is None:
continue
# add stack index
measurements['stack'] = stack._id
measurements = measurements.set_index('stack', append=True)
measurements = measurements.reorder_levels([2,0,1])
data.append(measurements)
assert stack_id == stack._id, 'Stack IDs do not match.'
# return None if no data are found
if len(data) == 0:
return None
# aggregate measurements
data = pd.concat(data, join='outer', sort=False)
# exclude cells that were not marked for inclusion
if selected_only:
data = data[data.selected]
# exclude cells on clone boundaries
if exclude_boundary:
data = data[~data.boundary]
return data