import logging
import warnings
from pathlib import Path
from typing import Iterable, Optional
import h5py
import marshmallow
import numpy as np
import pandas as pd
import datetime
import uuid
import SimpleITK as sitk
import pynwb
from pynwb.base import TimeSeries, Images
from pynwb import ProcessingModule, NWBFile
from pynwb.image import GrayscaleImage
from pynwb.ophys import (
DfOverF, ImageSegmentation, OpticalChannel, Fluorescence)
from allensdk.brain_observatory import dict_to_indexed_array
from allensdk.brain_observatory.behavior.image_api import Image
from allensdk.brain_observatory.behavior.image_api import ImageApi
from allensdk.brain_observatory.behavior.schemas import (
CompleteOphysBehaviorMetadataSchema, NwbOphysMetadataSchema,
BehaviorMetadataSchema, OphysBehaviorMetadataSchema,
BehaviorTaskParametersSchema, SubjectMetadataSchema
)
from allensdk.brain_observatory.nwb.metadata import load_pynwb_extension
log = logging.getLogger("allensdk.brain_observatory.nwb")
CELL_SPECIMEN_COL_DESCRIPTIONS = {
'cell_specimen_id': 'Unified id of segmented cell across experiments '
'(after cell matching)',
'height': 'Height of ROI in pixels',
'width': 'Width of ROI in pixels',
'mask_image_plane': 'Which image plane an ROI resides on. Overlapping '
'ROIs are stored on different mask image planes.',
'max_correction_down': 'Max motion correction in down direction in pixels',
'max_correction_left': 'Max motion correction in left direction in pixels',
'max_correction_up': 'Max motion correction in up direction in pixels',
'max_correction_right': 'Max motion correction in right direction in '
'pixels',
'valid_roi': 'Indicates if cell classification found the ROI to be a cell '
'or not',
'x': 'x position of ROI in Image Plane in pixels (top left corner)',
'y': 'y position of ROI in Image Plane in pixels (top left corner)'
}
[docs]def check_nwbfile_version(nwbfile_path: str,
desired_minimum_version: str,
warning_msg: str):
with h5py.File(nwbfile_path, 'r') as f:
# nwb 2.x files store version as an attribute
try:
nwb_version = str(f.attrs["nwb_version"]).split(".")
except KeyError:
# nwb 1.x files store version as dataset
try:
nwb_version = str(f["nwb_version"][...].astype(str))
# Stored in the form: `NWB-x.y.z`
nwb_version = nwb_version.split("-")[1].split(".")
except (KeyError, IndexError):
nwb_version = None
if nwb_version is None:
warnings.warn(f"'{nwbfile_path}' doesn't appear to be a valid "
f"Neurodata Without Borders (*.nwb) format file as "
f"neither a 'nwb_version' field nor dataset could "
f"be found!")
else:
if tuple(nwb_version) < tuple(desired_minimum_version.split(".")):
warnings.warn(warning_msg)
[docs]def read_eye_dlc_tracking_ellipses(input_path: Path) -> dict:
"""Reads eye tracking ellipse fit data from an h5 file.
Args:
input_path (Path): Path to eye tracking ellipse fit h5 file
Returns:
dict: Loaded h5 data. Each 'params' field contains dataframes with]
ellipse fit parameters. Dataframes contain 5 columns each
consisting of: "center_x", "center_y", "height", "phi", "width"
"""
eye_dlc_tracking_data = {}
# TODO: Some ellipses.h5 files have the 'cr' key as complex type instead of
# float. For now, when loading ellipses.h5 files, always coerce to float
# but this should eventually be resolved upstream...
# See: allensdk.brain_observatory.eye_tracking
pupil_params = pd.read_hdf(input_path, key="pupil").astype(float)
cr_params = pd.read_hdf(input_path, key="cr").astype(float)
eye_params = pd.read_hdf(input_path, key="eye").astype(float)
eye_dlc_tracking_data["pupil_params"] = pupil_params
eye_dlc_tracking_data["cr_params"] = cr_params
eye_dlc_tracking_data["eye_params"] = eye_params
return eye_dlc_tracking_data
[docs]def read_eye_gaze_mappings(input_path: Path) -> dict:
"""Reads eye gaze mapping data from an h5 file.
Args:
input_path (Path): Path to eye gaze mapping h5 data file produced by
'allensdk.brain_observatory.gaze_mapping' module.
Returns:
dict: Loaded h5 data.
*_eye_areas: Area of eye (in pixels^2) over time
*_pupil_areas: Area of pupil (in pixels^2) over time
*_screen_coordinates: y, x screen coordinates (in cm) over time
*_screen_coordinates_spherical: y, x screen coordinates (in deg)
over time
synced_frame_timestamps: synced timestamps for video frames
(in sec)
"""
eye_gaze_data = {}
eye_gaze_data["raw_eye_areas"] = \
pd.read_hdf(input_path, key="raw_eye_areas")
eye_gaze_data["raw_pupil_areas"] = \
pd.read_hdf(input_path, key="raw_pupil_areas")
eye_gaze_data["raw_screen_coordinates"] = \
pd.read_hdf(input_path, key="raw_screen_coordinates")
eye_gaze_data["raw_screen_coordinates_spherical"] = \
pd.read_hdf(input_path, key="raw_screen_coordinates_spherical")
eye_gaze_data["new_eye_areas"] = \
pd.read_hdf(input_path, key="new_eye_areas")
eye_gaze_data["new_pupil_areas"] = \
pd.read_hdf(input_path, key="new_pupil_areas")
eye_gaze_data["new_screen_coordinates"] = \
pd.read_hdf(input_path, key="new_screen_coordinates")
eye_gaze_data["new_screen_coordinates_spherical"] = \
pd.read_hdf(input_path, key="new_screen_coordinates_spherical")
eye_gaze_data["synced_frame_timestamps"] = \
pd.read_hdf(input_path, key="synced_frame_timestamps")
return eye_gaze_data
[docs]def create_eye_gaze_mapping_dataframe(eye_gaze_data: dict) -> pd.DataFrame:
eye_gaze_mapping_df = pd.DataFrame({
"raw_eye_area": eye_gaze_data["raw_eye_areas"].values,
"raw_pupil_area": eye_gaze_data["raw_pupil_areas"].values,
"raw_screen_coordinates_x_cm":
eye_gaze_data["raw_screen_coordinates"]["x_pos_cm"].values,
"raw_screen_coordinates_y_cm":
eye_gaze_data["raw_screen_coordinates"]["y_pos_cm"].values,
"raw_screen_coordinates_spherical_x_deg":
eye_gaze_data["raw_screen_coordinates_spherical"]["x_pos_deg"].values,
"raw_screen_coordinates_spherical_y_deg":
eye_gaze_data["raw_screen_coordinates_spherical"]["y_pos_deg"].values,
"filtered_eye_area": eye_gaze_data["new_eye_areas"].values,
"filtered_pupil_area": eye_gaze_data["new_pupil_areas"].values,
"filtered_screen_coordinates_x_cm":
eye_gaze_data["new_screen_coordinates"]["x_pos_cm"].values,
"filtered_screen_coordinates_y_cm":
eye_gaze_data["new_screen_coordinates"]["y_pos_cm"].values,
"filtered_screen_coordinates_spherical_x_deg":
eye_gaze_data["new_screen_coordinates_spherical"]["x_pos_deg"].values,
"filtered_screen_coordinates_spherical_y_deg":
eye_gaze_data["new_screen_coordinates_spherical"]["y_pos_deg"].values
},
index=eye_gaze_data["synced_frame_timestamps"].values
)
return eye_gaze_mapping_df
[docs]def eye_tracking_data_is_valid(eye_dlc_tracking_data: dict,
synced_timestamps: pd.Series) -> bool:
is_valid = True
pupil_params = eye_dlc_tracking_data["pupil_params"]
cr_params = eye_dlc_tracking_data["cr_params"]
eye_params = eye_dlc_tracking_data["eye_params"]
num_frames_match = ((pupil_params.shape[0] == cr_params.shape[0])
and (cr_params.shape[0] == eye_params.shape[0]))
if not num_frames_match:
log.warn("The number of frames for ellipse fits don't "
"match when they should. No ellipse fits will be written! "
f"pupil_params ({pupil_params.shape[0]}), "
f"cr_params ({cr_params.shape[0]}), "
f"eye_params ({eye_params.shape[0]})")
is_valid = False
if (pupil_params.shape[0] != len(synced_timestamps)):
log.warn("The number of camera sync pulses in the "
f"sync file ({len(synced_timestamps)}) do not match "
"with the number of eye tracking frames "
f"({pupil_params.shape[0]})! No ellipse fits will be "
"written!")
is_valid = False
return is_valid
[docs]def create_eye_tracking_nwb_processing_module(eye_dlc_tracking_data: dict,
synced_timestamps: pd.Series
) -> pynwb.ProcessingModule:
# Top level container for eye tracking processed data
eye_tracking_mod = pynwb.ProcessingModule(
name='eye_tracking',
description='Eye tracking processing module')
# Data interfaces of dlc_fits_container
pupil_fits = eye_dlc_tracking_data["pupil_params"].assign(
timestamps=synced_timestamps)
pupil_params = pynwb.core.DynamicTable.from_dataframe(
df=pupil_fits, name="pupil_ellipse_fits")
cr_fits = eye_dlc_tracking_data["cr_params"].assign(
timestamps=synced_timestamps)
cr_params = pynwb.core.DynamicTable.from_dataframe(df=cr_fits,
name="cr_ellipse_fits")
eye_fits = eye_dlc_tracking_data["eye_params"].assign(
timestamps=synced_timestamps)
eye_params = pynwb.core.DynamicTable.from_dataframe(
df=eye_fits, name="eye_ellipse_fits")
eye_tracking_mod.add_data_interface(pupil_params)
eye_tracking_mod.add_data_interface(cr_params)
eye_tracking_mod.add_data_interface(eye_params)
return eye_tracking_mod
[docs]def add_eye_gaze_data_interfaces(pynwb_container: pynwb.NWBContainer,
pupil_areas: pd.Series,
eye_areas: pd.Series,
screen_coordinates: pd.DataFrame,
screen_coordinates_spherical: pd.DataFrame,
synced_timestamps: pd.Series
) -> pynwb.NWBContainer:
pupil_area_ts = pynwb.base.TimeSeries(
name="pupil_area",
data=pupil_areas.values,
timestamps=synced_timestamps.values,
unit="Pixels ^ 2"
)
eye_area_ts = pynwb.base.TimeSeries(
name="eye_area",
data=eye_areas.values,
timestamps=synced_timestamps.values,
unit="Pixels ^ 2"
)
screen_coord_ts = pynwb.base.TimeSeries(
name="screen_coordinates",
data=screen_coordinates.values,
timestamps=synced_timestamps.values,
unit="Centimeters"
)
screen_coord_spherical_ts = pynwb.base.TimeSeries(
name="screen_coordinates_spherical",
data=screen_coordinates_spherical.values,
timestamps=synced_timestamps.values,
unit="Degrees"
)
pynwb_container.add_data_interface(pupil_area_ts)
pynwb_container.add_data_interface(eye_area_ts)
pynwb_container.add_data_interface(screen_coord_ts)
pynwb_container.add_data_interface(screen_coord_spherical_ts)
return pynwb_container
[docs]def create_gaze_mapping_nwb_processing_modules(eye_gaze_data: dict):
# Container for raw gaze mapped data
raw_gaze_mapping_mod = pynwb.ProcessingModule(
name='raw_gaze_mapping',
description='Gaze mapping processing module raw outputs')
raw_gaze_mapping_mod = add_eye_gaze_data_interfaces(
raw_gaze_mapping_mod,
pupil_areas=eye_gaze_data["raw_pupil_areas"],
eye_areas=eye_gaze_data["raw_eye_areas"],
screen_coordinates=eye_gaze_data["raw_screen_coordinates"],
screen_coordinates_spherical=eye_gaze_data["raw_screen_coordinates_spherical"], # noqa: E501
synced_timestamps=eye_gaze_data["synced_frame_timestamps"])
# Container for filtered gaze mapped data
filt_gaze_mapping_mod = pynwb.ProcessingModule(
name='filtered_gaze_mapping',
description='Gaze mapping processing module filtered outputs')
filt_gaze_mapping_mod = add_eye_gaze_data_interfaces(
filt_gaze_mapping_mod,
pupil_areas=eye_gaze_data["new_pupil_areas"],
eye_areas=eye_gaze_data["new_eye_areas"],
screen_coordinates=eye_gaze_data["new_screen_coordinates"],
screen_coordinates_spherical=eye_gaze_data["new_screen_coordinates_spherical"], # noqa: E501
synced_timestamps=eye_gaze_data["synced_frame_timestamps"])
return (raw_gaze_mapping_mod, filt_gaze_mapping_mod)
[docs]def add_eye_tracking_ellipse_fit_data_to_nwbfile(nwbfile: pynwb.NWBFile,
eye_dlc_tracking_data: dict,
synced_timestamps: pd.Series
) -> pynwb.NWBFile:
eye_tracking_mod = create_eye_tracking_nwb_processing_module(
eye_dlc_tracking_data, synced_timestamps)
nwbfile.add_processing_module(eye_tracking_mod)
return nwbfile
[docs]def add_eye_gaze_mapping_data_to_nwbfile(nwbfile: pynwb.NWBFile,
eye_gaze_data: dict) -> pynwb.NWBFile:
raw_gaze_mapping_mod, filt_gaze_mapping_mod = \
create_gaze_mapping_nwb_processing_modules(eye_gaze_data)
nwbfile.add_processing_module(raw_gaze_mapping_mod)
nwbfile.add_processing_module(filt_gaze_mapping_mod)
return nwbfile
[docs]def add_running_acquisition_to_nwbfile(nwbfile,
running_acquisition_df: pd.DataFrame):
running_dx_series = TimeSeries(
name='dx',
data=running_acquisition_df['dx'].values,
timestamps=running_acquisition_df.index.values,
unit='cm',
description=(
'Running wheel angular change, computed during data collection')
)
v_sig = TimeSeries(
name='v_sig',
data=running_acquisition_df['v_sig'].values,
timestamps=running_acquisition_df.index.values,
unit='V',
description='Voltage signal from the running wheel encoder'
)
v_in = TimeSeries(
name='v_in',
data=running_acquisition_df['v_in'].values,
timestamps=running_acquisition_df.index.values,
unit='V',
description=(
'The theoretical maximum voltage that the running wheel encoder '
'will reach prior to "wrapping". This should '
'theoretically be 5V (after crossing 5V goes to 0V, or '
'vice versa). In practice the encoder does not always '
'reach this value before wrapping, which can cause '
'transient spikes in speed at the voltage "wraps".')
)
if 'running' in nwbfile.processing:
running_mod = nwbfile.processing['running']
else:
running_mod = ProcessingModule('running',
'Running speed processing module')
nwbfile.add_processing_module(running_mod)
running_mod.add_data_interface(running_dx_series)
nwbfile.add_acquisition(v_sig)
nwbfile.add_acquisition(v_in)
return nwbfile
[docs]def add_running_speed_to_nwbfile(nwbfile, running_speed,
name='speed', unit='cm/s',
from_dataframe=False):
''' Adds running speed data to an NWBFile as a timeseries in acquisition
Parameters
----------
nwbfile : pynwb.NWBFile
File to which running speeds will be written
running_speed : Union[RunningSpeed, pd.DataFrame]
Either a RunningSpeed object or pandas DataFrame.
Contains attributes 'values' and 'timestamps'
name : str, optional
Used as name of timeseries object
unit : str, optional
SI units of running speed values
from_dataframe : bool, optional
Whether `running_speed` is a dataframe or not. Default is False.
Returns
-------
nwbfile : pynwb.NWBFile
'''
if from_dataframe:
data = running_speed['speed'].values
timestamps = running_speed['timestamps'].values
else:
data = running_speed.values
timestamps = running_speed.timestamps
running_speed_series = pynwb.base.TimeSeries(
name=name,
data=data,
timestamps=timestamps,
unit=unit)
if 'running' in nwbfile.processing:
running_mod = nwbfile.processing['running']
else:
running_mod = ProcessingModule('running',
'Running speed processing module')
nwbfile.add_processing_module(running_mod)
running_mod.add_data_interface(running_speed_series)
return nwbfile
[docs]def create_stimulus_presentation_time_interval(
name: str, description: str,
columns_to_add: Iterable) -> pynwb.epoch.TimeIntervals:
column_descriptions = {
"stimulus_name": "Name of stimulus",
"stimulus_block": ("Index of contiguous presentations of "
"one stimulus type"),
"temporal_frequency": "Temporal frequency of stimulus",
"x_position": "Horizontal position of stimulus on screen",
"y_position": "Vertical position of stimulus on screen",
"mask": "Shape of mask applied to stimulus",
"opacity": "Opacity of stimulus",
"phase": "Phase of grating stimulus",
"size": "Size of stimulus (see ‘units’ field for units)",
"units": "Units of stimulus size",
"stimulus_index": "Index of stimulus type",
"orientation": "Orientation of stimulus",
"spatial_frequency": "Spatial frequency of stimulus",
"frame": "Frame of movie stimulus",
"contrast": "Contrast of stimulus",
"Speed": "Speed of moving dot field",
"Dir": "Direction of stimulus motion",
"coherence": "Coherence of moving dot field",
"dotLife": "Longevity of individual dots",
"dotSize": "Size of individual dots",
"fieldPos": "Position of moving dot field",
"fieldShape": "Shape of moving dot field",
"fieldSize": "Size of moving dot field",
"nDots": "Number of dots in moving dot field"
}
columns_to_ignore = {'start_time', 'stop_time', 'tags', 'timeseries'}
interval = pynwb.epoch.TimeIntervals(name=name,
description=description)
for column_name in columns_to_add:
if column_name not in columns_to_ignore:
description = column_descriptions.get(
column_name, "No description")
interval.add_column(name=column_name, description=description)
return interval
[docs]def add_invalid_times(nwbfile, epochs):
"""
Write invalid times to nwbfile if epochs are not empty
Parameters
----------
nwbfile: pynwb.NWBFile
epochs: list of dicts
records of invalid epochs
Returns
-------
pynwb.NWBFile
"""
table = setup_table_for_invalid_times(epochs)
if not table.empty:
container = pynwb.epoch.TimeIntervals('invalid_times')
for index, row in table.iterrows():
container.add_interval(start_time=row['start_time'],
stop_time=row['stop_time'],
tags=row['tags'],
)
nwbfile.invalid_times = container
return nwbfile
[docs]def setup_table_for_invalid_times(invalid_epochs):
"""
Create table with invalid times if invalid_epochs are present
Parameters
----------
invalid_epochs: list of dicts
of invalid epoch records
Returns
-------
pd.DataFrame of invalid times if epochs are not empty,
otherwise return None
"""
if invalid_epochs:
df = pd.DataFrame.from_dict(invalid_epochs)
start_time = df['start_time'].values
stop_time = df['end_time'].values
tags = [[_type, str(_id), label]
for _type, _id, label
in zip(df['type'], df['id'], df['label'])]
table = pd.DataFrame({'start_time': start_time,
'stop_time': stop_time,
'tags': tags}
)
table.index.name = 'id'
else:
table = pd.DataFrame()
return table
[docs]def setup_table_for_epochs(table, timeseries, tag):
table = table.copy()
indices = np.searchsorted(timeseries.timestamps[:],
table['start_time'].values)
if len(indices > 0):
diffs = np.concatenate([np.diff(indices),
[table.shape[0] - indices[-1]]])
else:
diffs = []
table['tags'] = [(tag,)] * table.shape[0]
table['timeseries'] = [[[indices[ii], diffs[ii], timeseries]]
for ii in range(table.shape[0])]
return table
[docs]def add_stimulus_timestamps(nwbfile, stimulus_timestamps,
module_name='stimulus'):
stimulus_ts = TimeSeries(
data=stimulus_timestamps,
name='timestamps',
timestamps=stimulus_timestamps,
unit='s'
)
stim_mod = ProcessingModule(module_name, 'Stimulus Times processing')
nwbfile.add_processing_module(stim_mod)
stim_mod.add_data_interface(stimulus_ts)
return nwbfile
[docs]def add_trials(nwbfile, trials, description_dict={}):
order = list(trials.index)
for _, row in trials[['start_time', 'stop_time']].iterrows():
row_dict = row.to_dict()
nwbfile.add_trial(**row_dict)
for c in trials.columns:
if c in ['start_time', 'stop_time']:
continue
index, data = dict_to_indexed_array(trials[c].to_dict(), order)
if data.dtype == '<U1': # data type is composed of unicode characters
data = trials[c].tolist()
if not len(data) == len(order):
if len(data) == 0:
data = ['']
nwbfile.add_trial_column(
name=c,
description=description_dict.get(
c, 'NOT IMPLEMENTED: %s' % c),
data=data,
index=index)
else:
nwbfile.add_trial_column(
name=c,
description=description_dict.get(
c, 'NOT IMPLEMENTED: %s' % c),
data=data)
[docs]def add_licks(nwbfile, licks):
lick_timeseries = TimeSeries(
name='licks',
data=licks.frame.values,
timestamps=licks.timestamps.values,
description=('Timestamps and stimulus presentation '
'frame indices for lick events'),
unit='N/A'
)
# Add lick interface to nwb file, by way of a processing module:
licks_mod = ProcessingModule('licking',
'Licking behavior processing module')
licks_mod.add_data_interface(lick_timeseries)
nwbfile.add_processing_module(licks_mod)
return nwbfile
[docs]def add_rewards(nwbfile, rewards_df):
reward_volume_ts = TimeSeries(
name='volume',
data=rewards_df.volume.values,
timestamps=rewards_df['timestamps'].values,
unit='mL'
)
autorewarded_ts = TimeSeries(
name='autorewarded',
data=rewards_df.autorewarded.values,
timestamps=reward_volume_ts.timestamps,
unit='mL'
)
rewards_mod = ProcessingModule('rewards',
'Licking behavior processing module')
rewards_mod.add_data_interface(reward_volume_ts)
rewards_mod.add_data_interface(autorewarded_ts)
nwbfile.add_processing_module(rewards_mod)
return nwbfile
[docs]def add_image(nwbfile, image_data, image_name, module_name,
module_description, image_api=None):
description = '{} image at pixels/cm resolution'.format(image_name)
if image_api is None:
image_api = ImageApi
if isinstance(image_data, sitk.Image):
data, spacing, unit = ImageApi.deserialize(image_data)
elif isinstance(image_data, Image):
data = image_data.data
spacing = image_data.spacing
unit = image_data.unit
else:
raise ValueError("Not a supported image_data type: "
f"{type(image_data)}")
assert spacing[0] == spacing[1] and len(spacing) == 2 and unit == 'mm'
if module_name not in nwbfile.processing:
ophys_mod = ProcessingModule(module_name, module_description)
nwbfile.add_processing_module(ophys_mod)
else:
ophys_mod = nwbfile.processing[module_name]
image = GrayscaleImage(image_name,
data,
resolution=spacing[0] / 10,
description=description)
if 'images' not in ophys_mod.containers:
images = Images(name='images')
ophys_mod.add_data_interface(images)
else:
images = ophys_mod['images']
images.add_image(image)
return nwbfile
[docs]def add_max_projection(nwbfile, max_projection, image_api=None):
add_image(nwbfile,
max_projection,
'max_projection',
'ophys',
'Ophys processing module',
image_api=image_api)
[docs]def add_average_image(nwbfile, average_image, image_api=None):
add_image(nwbfile,
average_image,
'average_image',
'ophys',
'Ophys processing module',
image_api=image_api)
[docs]def add_segmentation_mask_image(nwbfile,
segmentation_mask_image,
image_api=None):
add_image(nwbfile,
segmentation_mask_image,
'segmentation_mask_image',
'ophys',
'Ophys processing module',
image_api=image_api)
[docs]def add_task_parameters(nwbfile, task_parameters):
OphysBehaviorTaskParameters = load_pynwb_extension(
BehaviorTaskParametersSchema, 'ndx-aibs-behavior-ophys'
)
task_parameters_clean = BehaviorTaskParametersSchema().dump(
task_parameters
)
new_task_parameters_dict = {}
for key, val in task_parameters_clean.items():
if isinstance(val, list):
new_task_parameters_dict[key] = np.array(val)
else:
new_task_parameters_dict[key] = val
nwb_task_parameters = OphysBehaviorTaskParameters(
name='task_parameters', **new_task_parameters_dict)
nwbfile.add_lab_meta_data(nwb_task_parameters)
[docs]def add_cell_specimen_table(nwbfile: NWBFile,
cell_specimen_table: pd.DataFrame,
session_metadata: dict):
"""
This function takes the cell specimen table and writes the ROIs
contained within. It writes these to a new NWB imaging plane
based off the previously supplied metadata
Parameters
----------
nwbfile: NWBFile
this is the in memory NWBFile currently being written to which ROI data
is added
cell_specimen_table: pd.DataFrame
this is the DataFrame containing the cells segmented from a ophys
experiment, stored in json file and loaded.
example: /home/nicholasc/projects/allensdk/allensdk/test/
brain_observatory/behavior/cell_specimen_table_789359614.json
session_metadata: dict
Dictionary containing cell_specimen_table related metadata. Should
include at minimum the following fields:
"emission_lambda", "excitation_lambda", "indicator",
"targeted_structure", and ophys_frame_rate"
Returns
-------
nwbfile: NWBFile
The altered in memory NWBFile object that now has a specimen table
"""
cell_specimen_metadata = NwbOphysMetadataSchema().load(
session_metadata, unknown=marshmallow.EXCLUDE)
cell_roi_table = cell_specimen_table.reset_index().set_index('cell_roi_id')
# Device:
device_name: str = nwbfile.lab_meta_data['metadata'].equipment_name
if device_name.startswith("MESO"):
device_config = {
"name": device_name,
"description": "Allen Brain Observatory - Mesoscope 2P Rig"
}
else:
device_config = {
"name": device_name,
"description": "Allen Brain Observatory - Scientifica 2P Rig",
"manufacturer": "Scientifica"
}
nwbfile.create_device(**device_config)
device = nwbfile.get_device(device_name)
# FOV:
fov_width = nwbfile.lab_meta_data['metadata'].field_of_view_width
fov_height = nwbfile.lab_meta_data['metadata'].field_of_view_height
imaging_plane_description = "{} field of view in {} at depth {} um".format(
(fov_width, fov_height),
cell_specimen_metadata['targeted_structure'],
nwbfile.lab_meta_data['metadata'].imaging_depth)
# Optical Channel:
optical_channel = OpticalChannel(
name='channel_1',
description='2P Optical Channel',
emission_lambda=cell_specimen_metadata['emission_lambda'])
# Imaging Plane:
imaging_plane = nwbfile.create_imaging_plane(
name='imaging_plane_1',
optical_channel=optical_channel,
description=imaging_plane_description,
device=device,
excitation_lambda=cell_specimen_metadata['excitation_lambda'],
imaging_rate=cell_specimen_metadata['ophys_frame_rate'],
indicator=cell_specimen_metadata['indicator'],
location=cell_specimen_metadata['targeted_structure'])
# Image Segmentation:
image_segmentation = ImageSegmentation(name="image_segmentation")
if 'ophys' not in nwbfile.processing:
ophys_module = ProcessingModule('ophys', 'Ophys processing module')
nwbfile.add_processing_module(ophys_module)
else:
ophys_module = nwbfile.processing['ophys']
ophys_module.add_data_interface(image_segmentation)
# Plane Segmentation:
plane_segmentation = image_segmentation.create_plane_segmentation(
name='cell_specimen_table',
description="Segmented rois",
imaging_plane=imaging_plane)
for col_name in cell_roi_table.columns:
# the columns 'roi_mask', 'pixel_mask', and 'voxel_mask' are
# already defined in the nwb.ophys::PlaneSegmentation Object
if col_name not in ['id', 'mask_matrix', 'roi_mask',
'pixel_mask', 'voxel_mask']:
# This builds the columns with name of column and description
# of column both equal to the column name in the cell_roi_table
plane_segmentation.add_column(
col_name,
CELL_SPECIMEN_COL_DESCRIPTIONS.get(
col_name,
"No Description Available"))
# go through each roi and add it to the plan segmentation object
for cell_roi_id, table_row in cell_roi_table.iterrows():
# NOTE: The 'roi_mask' in this cell_roi_table has already been
# processing by the function from
# allensdk.brain_observatory.behavior.session_apis.data_io.ophys_lims_api
# get_cell_specimen_table() method. As a result, the ROI is stored in
# an array that is the same shape as the FULL field of view of the
# experiment (e.g. 512 x 512).
mask = table_row.pop('roi_mask')
csid = table_row.pop('cell_specimen_id')
table_row['cell_specimen_id'] = -1 if csid is None else csid
table_row['id'] = cell_roi_id
plane_segmentation.add_roi(image_mask=mask, **table_row.to_dict())
return nwbfile
[docs]def add_dff_traces(nwbfile, dff_traces, ophys_timestamps):
dff_traces = dff_traces.reset_index().set_index('cell_roi_id')[['dff']]
ophys_module = nwbfile.processing['ophys']
# trace data in the form of rois x timepoints
trace_data = np.array([dff_traces.loc[cell_roi_id].dff
for cell_roi_id in dff_traces.index.values])
cell_specimen_table = nwbfile.processing['ophys'].data_interfaces['image_segmentation'].plane_segmentations['cell_specimen_table'] # noqa: E501
roi_table_region = cell_specimen_table.create_roi_table_region(
description="segmented cells labeled by cell_specimen_id",
region=slice(len(dff_traces)))
# Create/Add dff modules and interfaces:
assert dff_traces.index.name == 'cell_roi_id'
dff_interface = DfOverF(name='dff')
ophys_module.add_data_interface(dff_interface)
dff_interface.create_roi_response_series(
name='traces',
data=trace_data.T, # Should be stored as timepoints x rois
unit='NA',
rois=roi_table_region,
timestamps=ophys_timestamps)
return nwbfile
[docs]def add_corrected_fluorescence_traces(nwbfile, corrected_fluorescence_traces):
corrected_fluorescence_traces = \
corrected_fluorescence_traces.reset_index().set_index(
'cell_roi_id')[['corrected_fluorescence']]
# Create/Add corrected_fluorescence_traces modules and interfaces:
assert corrected_fluorescence_traces.index.name == 'cell_roi_id'
ophys_module = nwbfile.processing['ophys']
# trace data in the form of rois x timepoints
f_trace_data = np.array(
[corrected_fluorescence_traces.loc[cell_roi_id].corrected_fluorescence
for cell_roi_id in corrected_fluorescence_traces.index.values])
roi_table_region = nwbfile.processing['ophys'].data_interfaces['dff'].roi_response_series['traces'].rois # noqa: E501
ophys_timestamps = ophys_module.get_data_interface(
'dff').roi_response_series['traces'].timestamps
f_interface = Fluorescence(name='corrected_fluorescence')
ophys_module.add_data_interface(f_interface)
f_interface.create_roi_response_series(
name='traces',
data=f_trace_data.T, # Should be stored as timepoints x rois
unit='NA',
rois=roi_table_region,
timestamps=ophys_timestamps)
return nwbfile
[docs]def add_motion_correction(nwbfile, motion_correction):
ophys_module = nwbfile.processing['ophys']
ophys_timestamps = ophys_module.get_data_interface(
'dff').roi_response_series['traces'].timestamps
t1 = TimeSeries(
name='ophys_motion_correction_x',
data=motion_correction['x'].values,
timestamps=ophys_timestamps,
unit='pixels'
)
t2 = TimeSeries(
name='ophys_motion_correction_y',
data=motion_correction['y'].values,
timestamps=ophys_timestamps,
unit='pixels'
)
ophys_module.add_data_interface(t1)
ophys_module.add_data_interface(t2)