Source code for allensdk.brain_observatory.nwb

import logging
import warnings
from pathlib import Path
from typing import Iterable, Union

import h5py
import marshmallow
import numpy as np
import pandas as pd
import datetime
import uuid
import SimpleITK as sitk
import pynwb
from pynwb.base import TimeSeries, Images
from pynwb import ProcessingModule, NWBFile
from pynwb.image import ImageSeries, GrayscaleImage, IndexSeries
from pynwb.ophys import (
    DfOverF, ImageSegmentation, OpticalChannel, Fluorescence)

from allensdk.brain_observatory.behavior.stimulus_processing.stimulus_templates import \
    StimulusImage, StimulusTemplate
from allensdk.brain_observatory.nwb.nwb_utils import (get_column_name)
from allensdk.brain_observatory import dict_to_indexed_array
from allensdk.brain_observatory.behavior.image_api import Image
from allensdk.brain_observatory.behavior.image_api import ImageApi
from allensdk.brain_observatory.behavior.schemas import (
    CompleteOphysBehaviorMetadataSchema, NwbOphysMetadataSchema,
    BehaviorMetadataSchema, OphysBehaviorMetadataSchema,
    BehaviorTaskParametersSchema, SubjectMetadataSchema
)
from allensdk.brain_observatory.nwb.metadata import load_pynwb_extension


log = logging.getLogger("allensdk.brain_observatory.nwb")

CELL_SPECIMEN_COL_DESCRIPTIONS = {
    'cell_specimen_id': 'Unified id of segmented cell across experiments (after'
                        ' cell matching)',
    'height': 'Height of ROI in pixels',
    'width': 'Width of ROI in pixels',
    'mask_image_plane': 'Which image plane an ROI resides on. Overlapping ROIs '
                        'are stored on different mask image planes.',
    'max_correction_down': 'Max motion correction in down direction in pixels',
    'max_correction_left': 'Max motion correction in left direction in pixels',
    'max_correction_up': 'Max motion correction in up direction in pixels',
    'max_correction_right': 'Max motion correction in right direction in '
                            'pixels',
    'valid_roi': 'Indicates if cell classification found the ROI to be a cell '
                 'or not',
    'x': 'x position of ROI in Image Plane in pixels (top left corner)',
    'y': 'y position of ROI in Image Plane in pixels (top left corner)'
}

[docs]def check_nwbfile_version(nwbfile_path: str, desired_minimum_version: str, warning_msg: str): with h5py.File(nwbfile_path, 'r') as f: # nwb 2.x files store version as an attribute try: nwb_version = str(f.attrs["nwb_version"]).split(".") except KeyError: # nwb 1.x files store version as dataset try: nwb_version = str(f["nwb_version"][...].astype(str)) # Stored in the form: `NWB-x.y.z` nwb_version = nwb_version.split("-")[1].split(".") except (KeyError, IndexError): nwb_version = None if nwb_version is None: warnings.warn(f"'{nwbfile_path}' doesn't appear to be a valid " f"Neurodata Without Borders (*.nwb) format file as " f"neither a 'nwb_version' field nor dataset could " f"be found!") else: if tuple(nwb_version) < tuple(desired_minimum_version.split(".")): warnings.warn(warning_msg)
[docs]def read_eye_dlc_tracking_ellipses(input_path: Path) -> dict: """Reads eye tracking ellipse fit data from an h5 file. Args: input_path (Path): Path to eye tracking ellipse fit h5 file Returns: dict: Loaded h5 data. Each 'params' field contains dataframes with] ellipse fit parameters. Dataframes contain 5 columns each consisting of: "center_x", "center_y", "height", "phi", "width" """ eye_dlc_tracking_data = {} # TODO: Some ellipses.h5 files have the 'cr' key as complex type instead of # float. For now, when loading ellipses.h5 files, always coerce to float # but this should eventually be resolved upstream... # See: allensdk.brain_observatory.eye_tracking pupil_params = pd.read_hdf(input_path, key="pupil").astype(float) cr_params = pd.read_hdf(input_path, key="cr").astype(float) eye_params = pd.read_hdf(input_path, key="eye").astype(float) eye_dlc_tracking_data["pupil_params"] = pupil_params eye_dlc_tracking_data["cr_params"] = cr_params eye_dlc_tracking_data["eye_params"] = eye_params return eye_dlc_tracking_data
[docs]def read_eye_gaze_mappings(input_path: Path) -> dict: """Reads eye gaze mapping data from an h5 file. Args: input_path (Path): Path to eye gaze mapping h5 data file produced by 'allensdk.brain_observatory.gaze_mapping' module. Returns: dict: Loaded h5 data. *_eye_areas: Area of eye (in pixels^2) over time *_pupil_areas: Area of pupil (in pixels^2) over time *_screen_coordinates: y, x screen coordinates (in cm) over time *_screen_coordinates_spherical: y, x screen coordinates (in deg) over time synced_frame_timestamps: synced timestamps for video frames (in sec) """ eye_gaze_data = {} eye_gaze_data["raw_eye_areas"] = pd.read_hdf(input_path, key="raw_eye_areas") eye_gaze_data["raw_pupil_areas"] = pd.read_hdf(input_path, key="raw_pupil_areas") eye_gaze_data["raw_screen_coordinates"] = pd.read_hdf(input_path, key="raw_screen_coordinates") eye_gaze_data["raw_screen_coordinates_spherical"] = pd.read_hdf(input_path, key="raw_screen_coordinates_spherical") eye_gaze_data["new_eye_areas"] = pd.read_hdf(input_path, key="new_eye_areas") eye_gaze_data["new_pupil_areas"] = pd.read_hdf(input_path, key="new_pupil_areas") eye_gaze_data["new_screen_coordinates"] = pd.read_hdf(input_path, key="new_screen_coordinates") eye_gaze_data["new_screen_coordinates_spherical"] = pd.read_hdf(input_path, key="new_screen_coordinates_spherical") eye_gaze_data["synced_frame_timestamps"] = pd.read_hdf(input_path, key="synced_frame_timestamps") return eye_gaze_data
[docs]def create_eye_gaze_mapping_dataframe(eye_gaze_data: dict) -> pd.DataFrame: eye_gaze_mapping_df = pd.DataFrame( { "raw_eye_area": eye_gaze_data["raw_eye_areas"].values, "raw_pupil_area": eye_gaze_data["raw_pupil_areas"].values, "raw_screen_coordinates_x_cm": eye_gaze_data["raw_screen_coordinates"]["x_pos_cm"].values, "raw_screen_coordinates_y_cm": eye_gaze_data["raw_screen_coordinates"]["y_pos_cm"].values, "raw_screen_coordinates_spherical_x_deg": eye_gaze_data["raw_screen_coordinates_spherical"]["x_pos_deg"].values, "raw_screen_coordinates_spherical_y_deg": eye_gaze_data["raw_screen_coordinates_spherical"]["y_pos_deg"].values, "filtered_eye_area": eye_gaze_data["new_eye_areas"].values, "filtered_pupil_area": eye_gaze_data["new_pupil_areas"].values, "filtered_screen_coordinates_x_cm": eye_gaze_data["new_screen_coordinates"]["x_pos_cm"].values, "filtered_screen_coordinates_y_cm": eye_gaze_data["new_screen_coordinates"]["y_pos_cm"].values, "filtered_screen_coordinates_spherical_x_deg": eye_gaze_data["new_screen_coordinates_spherical"]["x_pos_deg"].values, "filtered_screen_coordinates_spherical_y_deg": eye_gaze_data["new_screen_coordinates_spherical"]["y_pos_deg"].values }, index=eye_gaze_data["synced_frame_timestamps"].values ) return eye_gaze_mapping_df
[docs]def eye_tracking_data_is_valid(eye_dlc_tracking_data: dict, synced_timestamps: pd.Series) -> bool: is_valid = True pupil_params = eye_dlc_tracking_data["pupil_params"] cr_params = eye_dlc_tracking_data["cr_params"] eye_params = eye_dlc_tracking_data["eye_params"] num_frames_match = ((pupil_params.shape[0] == cr_params.shape[0]) and (cr_params.shape[0] == eye_params.shape[0])) if not num_frames_match: log.warn("The number of frames for ellipse fits don't " "match when they should. No ellipse fits will be written! " f"pupil_params ({pupil_params.shape[0]}), " f"cr_params ({cr_params.shape[0]}), " f"eye_params ({eye_params.shape[0]})") is_valid = False if (pupil_params.shape[0] != len(synced_timestamps)): log.warn("The number of camera sync pulses in the " f"sync file ({len(synced_timestamps)}) do not match " "with the number of eye tracking frames " f"({pupil_params.shape[0]})! No ellipse fits will be written!") is_valid = False return is_valid
[docs]def create_eye_tracking_nwb_processing_module(eye_dlc_tracking_data: dict, synced_timestamps: pd.Series) -> pynwb.ProcessingModule: # Top level container for eye tracking processed data eye_tracking_mod = pynwb.ProcessingModule(name='eye_tracking', description='Eye tracking processing module') # Data interfaces of dlc_fits_container pupil_fits = eye_dlc_tracking_data["pupil_params"].assign(timestamps=synced_timestamps) pupil_params = pynwb.core.DynamicTable.from_dataframe(df=pupil_fits, name="pupil_ellipse_fits") cr_fits = eye_dlc_tracking_data["cr_params"].assign(timestamps=synced_timestamps) cr_params = pynwb.core.DynamicTable.from_dataframe(df=cr_fits, name="cr_ellipse_fits") eye_fits = eye_dlc_tracking_data["eye_params"].assign(timestamps=synced_timestamps) eye_params = pynwb.core.DynamicTable.from_dataframe(df=eye_fits, name="eye_ellipse_fits") eye_tracking_mod.add_data_interface(pupil_params) eye_tracking_mod.add_data_interface(cr_params) eye_tracking_mod.add_data_interface(eye_params) return eye_tracking_mod
[docs]def add_eye_gaze_data_interfaces(pynwb_container: pynwb.NWBContainer, pupil_areas: pd.Series, eye_areas: pd.Series, screen_coordinates: pd.DataFrame, screen_coordinates_spherical: pd.DataFrame, synced_timestamps: pd.Series) -> pynwb.NWBContainer: pupil_area_ts = pynwb.base.TimeSeries( name="pupil_area", data=pupil_areas.values, timestamps=synced_timestamps.values, unit="Pixels ^ 2" ) eye_area_ts = pynwb.base.TimeSeries( name="eye_area", data=eye_areas.values, timestamps=synced_timestamps.values, unit="Pixels ^ 2" ) screen_coord_ts = pynwb.base.TimeSeries( name="screen_coordinates", data=screen_coordinates.values, timestamps=synced_timestamps.values, unit="Centimeters" ) screen_coord_spherical_ts = pynwb.base.TimeSeries( name="screen_coordinates_spherical", data=screen_coordinates_spherical.values, timestamps=synced_timestamps.values, unit="Degrees" ) pynwb_container.add_data_interface(pupil_area_ts) pynwb_container.add_data_interface(eye_area_ts) pynwb_container.add_data_interface(screen_coord_ts) pynwb_container.add_data_interface(screen_coord_spherical_ts) return pynwb_container
[docs]def create_gaze_mapping_nwb_processing_modules(eye_gaze_data: dict): # Container for raw gaze mapped data raw_gaze_mapping_mod = pynwb.ProcessingModule(name='raw_gaze_mapping', description='Gaze mapping processing module raw outputs') raw_gaze_mapping_mod = add_eye_gaze_data_interfaces(raw_gaze_mapping_mod, pupil_areas=eye_gaze_data["raw_pupil_areas"], eye_areas=eye_gaze_data["raw_eye_areas"], screen_coordinates=eye_gaze_data["raw_screen_coordinates"], screen_coordinates_spherical=eye_gaze_data["raw_screen_coordinates_spherical"], synced_timestamps=eye_gaze_data["synced_frame_timestamps"]) # Container for filtered gaze mapped data filt_gaze_mapping_mod = pynwb.ProcessingModule(name='filtered_gaze_mapping', description='Gaze mapping processing module filtered outputs') filt_gaze_mapping_mod = add_eye_gaze_data_interfaces(filt_gaze_mapping_mod, pupil_areas=eye_gaze_data["new_pupil_areas"], eye_areas=eye_gaze_data["new_eye_areas"], screen_coordinates=eye_gaze_data["new_screen_coordinates"], screen_coordinates_spherical=eye_gaze_data["new_screen_coordinates_spherical"], synced_timestamps=eye_gaze_data["synced_frame_timestamps"]) return (raw_gaze_mapping_mod, filt_gaze_mapping_mod)
[docs]def add_eye_tracking_ellipse_fit_data_to_nwbfile(nwbfile: pynwb.NWBFile, eye_dlc_tracking_data: dict, synced_timestamps: pd.Series) -> pynwb.NWBFile: eye_tracking_mod = create_eye_tracking_nwb_processing_module(eye_dlc_tracking_data, synced_timestamps) nwbfile.add_processing_module(eye_tracking_mod) return nwbfile
[docs]def add_eye_gaze_mapping_data_to_nwbfile(nwbfile: pynwb.NWBFile, eye_gaze_data: dict) -> pynwb.NWBFile: raw_gaze_mapping_mod, filt_gaze_mapping_mod = create_gaze_mapping_nwb_processing_modules(eye_gaze_data) nwbfile.add_processing_module(raw_gaze_mapping_mod) nwbfile.add_processing_module(filt_gaze_mapping_mod) return nwbfile
[docs]def add_running_acquisition_to_nwbfile(nwbfile, running_acquisition_df: pd.DataFrame): running_dx_series = TimeSeries( name='dx', data=running_acquisition_df['dx'].values, timestamps=running_acquisition_df.index.values, unit='cm', description=( 'Running wheel angular change, computed during data collection') ) v_sig = TimeSeries( name='v_sig', data=running_acquisition_df['v_sig'].values, timestamps=running_acquisition_df.index.values, unit='V', description='Voltage signal from the running wheel encoder' ) v_in = TimeSeries( name='v_in', data=running_acquisition_df['v_in'].values, timestamps=running_acquisition_df.index.values, unit='V', description=( 'The theoretical maximum voltage that the running wheel encoder ' 'will reach prior to "wrapping". This should ' 'theoretically be 5V (after crossing 5V goes to 0V, or ' 'vice versa). In practice the encoder does not always ' 'reach this value before wrapping, which can cause ' 'transient spikes in speed at the voltage "wraps".') ) if 'running' in nwbfile.processing: running_mod = nwbfile.processing['running'] else: running_mod = ProcessingModule('running', 'Running speed processing module') nwbfile.add_processing_module(running_mod) running_mod.add_data_interface(running_dx_series) nwbfile.add_acquisition(v_sig) nwbfile.add_acquisition(v_in) return nwbfile
[docs]def add_running_speed_to_nwbfile(nwbfile, running_speed, name='speed', unit='cm/s', from_dataframe=False): ''' Adds running speed data to an NWBFile as a timeseries in acquisition Parameters ---------- nwbfile : pynwb.NWBFile File to which running speeds will be written running_speed : Union[RunningSpeed, pd.DataFrame] Either a RunningSpeed object or pandas DataFrame. Contains attributes 'values' and 'timestamps' name : str, optional Used as name of timeseries object unit : str, optional SI units of running speed values from_dataframe : bool, optional Whether `running_speed` is a dataframe or not. Default is False. Returns ------- nwbfile : pynwb.NWBFile ''' if from_dataframe: data = running_speed['speed'].values timestamps = running_speed['timestamps'].values else: data = running_speed.values timestamps = running_speed.timestamps running_speed_series = pynwb.base.TimeSeries( name=name, data=data, timestamps=timestamps, unit=unit) if 'running' in nwbfile.processing: running_mod = nwbfile.processing['running'] else: running_mod = ProcessingModule('running', 'Running speed processing module') nwbfile.add_processing_module(running_mod) running_mod.add_data_interface(running_speed_series) return nwbfile
[docs]def add_stimulus_template(nwbfile: NWBFile, stimulus_template: StimulusTemplate): images = [] image_names = [] for image_name, image_data in stimulus_template.items(): image_names.append(image_name) images.append(image_data) image_index = list(range(len(images))) visual_stimulus_image_series = \ ImageSeries( name=stimulus_template.image_set_name, data=images, control=list(range(len(image_names))), control_description=image_names, unit='NA', format='raw', timestamps=image_index) nwbfile.add_stimulus_template(visual_stimulus_image_series) return nwbfile
[docs]def create_stimulus_presentation_time_interval(name: str, description: str, columns_to_add: Iterable) -> pynwb.epoch.TimeIntervals: column_descriptions = { "stimulus_name": "Name of stimulus", "stimulus_block": "Index of contiguous presentations of one stimulus type", "temporal_frequency": "Temporal frequency of stimulus", "x_position": "Horizontal position of stimulus on screen", "y_position": "Vertical position of stimulus on screen", "mask": "Shape of mask applied to stimulus", "opacity": "Opacity of stimulus", "phase": "Phase of grating stimulus", "size": "Size of stimulus (see ‘units’ field for units)", "units": "Units of stimulus size", "stimulus_index": "Index of stimulus type", "orientation": "Orientation of stimulus", "spatial_frequency": "Spatial frequency of stimulus", "frame": "Frame of movie stimulus", "contrast": "Contrast of stimulus", "Speed": "Speed of moving dot field", "Dir": "Direction of stimulus motion", "coherence": "Coherence of moving dot field", "dotLife": "Longevity of individual dots", "dotSize": "Size of individual dots", "fieldPos": "Position of moving dot field", "fieldShape": "Shape of moving dot field", "fieldSize": "Size of moving dot field", "nDots": "Number of dots in moving dot field" } columns_to_ignore = {'start_time', 'stop_time', 'tags', 'timeseries'} interval = pynwb.epoch.TimeIntervals(name=name, description=description) for column_name in columns_to_add: if column_name not in columns_to_ignore: description = column_descriptions.get(column_name, "No description") interval.add_column(name=column_name, description=description) return interval
[docs]def add_stimulus_presentations(nwbfile, stimulus_table, tag='stimulus_time_interval'): """Adds a stimulus table (defining stimulus characteristics for each time point in a session) to an nwbfile as TimeIntervals. Parameters ---------- nwbfile : pynwb.NWBFile stimulus_table: pd.DataFrame Each row corresponds to an interval of time. Columns define the interval (start and stop time) and its characteristics. Nans in columns with string data will be replaced with the empty strings. Required columns are: start_time :: the time at which this interval started stop_time :: the time at which this interval ended tag : str, optional Each interval in an nwb file has one or more tags. This string will be applied as a tag to all TimeIntervals created here Returns ------- nwbfile : pynwb.NWBFile """ stimulus_table = stimulus_table.copy() ts = nwbfile.processing['stimulus'].get_data_interface('timestamps') possible_names = {'stimulus_name', 'image_name'} stimulus_name_column = get_column_name(stimulus_table.columns, possible_names) stimulus_names = stimulus_table[stimulus_name_column].unique() for stim_name in sorted(stimulus_names): specific_stimulus_table = stimulus_table[stimulus_table[stimulus_name_column] == stim_name] # Drop columns where all values in column are NaN cleaned_table = specific_stimulus_table.dropna(axis=1, how='all') # For columns with mixed strings and NaNs, fill NaNs with 'N/A' for colname, series in cleaned_table.items(): types = set(series.map(type)) if len(types) > 1 and str in types: series.fillna('N/A', inplace=True) cleaned_table[colname] = series.transform(str) interval_description = (f"Presentation times and stimuli details " f"for '{stim_name}' stimuli") presentation_interval = create_stimulus_presentation_time_interval( name=f"{stim_name}_presentations", description=interval_description, columns_to_add=cleaned_table.columns ) for row in cleaned_table.itertuples(index=False): row = row._asdict() presentation_interval.add_interval(**row, tags=tag, timeseries=ts) nwbfile.add_time_intervals(presentation_interval) return nwbfile
[docs]def add_invalid_times(nwbfile, epochs): """ Write invalid times to nwbfile if epochs are not empty Parameters ---------- nwbfile: pynwb.NWBFile epochs: list of dicts records of invalid epochs Returns ------- pynwb.NWBFile """ table = setup_table_for_invalid_times(epochs) if not table.empty: container = pynwb.epoch.TimeIntervals('invalid_times') for index, row in table.iterrows(): container.add_interval(start_time=row['start_time'], stop_time=row['stop_time'], tags=row['tags'], ) nwbfile.invalid_times = container return nwbfile
[docs]def setup_table_for_invalid_times(invalid_epochs): """ Create table with invalid times if invalid_epochs are present Parameters ---------- invalid_epochs: list of dicts of invalid epoch records Returns ------- pd.DataFrame of invalid times if epochs are not empty, otherwise return None """ if invalid_epochs: df = pd.DataFrame.from_dict(invalid_epochs) start_time = df['start_time'].values stop_time = df['end_time'].values tags = [[_type, str(_id), label] for _type, _id, label in zip(df['type'], df['id'], df['label'])] table = pd.DataFrame({'start_time': start_time, 'stop_time': stop_time, 'tags': tags} ) table.index.name = 'id' else: table = pd.DataFrame() return table
[docs]def setup_table_for_epochs(table, timeseries, tag): table = table.copy() indices = np.searchsorted(timeseries.timestamps[:], table['start_time'].values) if len(indices > 0): diffs = np.concatenate([np.diff(indices), [table.shape[0] - indices[-1]]]) else: diffs = [] table['tags'] = [(tag,)] * table.shape[0] table['timeseries'] = [[[indices[ii], diffs[ii], timeseries]] for ii in range(table.shape[0])] return table
[docs]def add_stimulus_timestamps(nwbfile, stimulus_timestamps, module_name='stimulus'): stimulus_ts = TimeSeries( data=stimulus_timestamps, name='timestamps', timestamps=stimulus_timestamps, unit='s' ) stim_mod = ProcessingModule(module_name, 'Stimulus Times processing') nwbfile.add_processing_module(stim_mod) stim_mod.add_data_interface(stimulus_ts) return nwbfile
[docs]def add_trials(nwbfile, trials, description_dict={}): order = list(trials.index) for _, row in trials[['start_time', 'stop_time']].iterrows(): row_dict = row.to_dict() nwbfile.add_trial(**row_dict) for c in [c for c in trials.columns if c not in ['start_time', 'stop_time']]: index, data = dict_to_indexed_array(trials[c].to_dict(), order) if data.dtype == '<U1': # data type is composed of unicode characters data = trials[c].tolist() if not len(data) == len(order): if len(data) == 0: data = [''] nwbfile.add_trial_column(name=c, description=description_dict.get(c, 'NOT IMPLEMENTED: %s' % c), data=data, index=index) else: nwbfile.add_trial_column(name=c, description=description_dict.get(c, 'NOT IMPLEMENTED: %s' % c), data=data)
[docs]def add_licks(nwbfile, licks): lick_timeseries = TimeSeries( name='licks', data=licks.frame.values, timestamps=licks.time.values, description=('Timestamps and stimulus presentation ' 'frame indices for lick events'), unit='N/A' ) # Add lick interface to nwb file, by way of a processing module: licks_mod = ProcessingModule('licking', 'Licking behavior processing module') licks_mod.add_data_interface(lick_timeseries) nwbfile.add_processing_module(licks_mod) return nwbfile
[docs]def add_rewards(nwbfile, rewards_df): assert rewards_df.index.name == 'timestamps' reward_volume_ts = TimeSeries( name='volume', data=rewards_df.volume.values, timestamps=rewards_df.index.values, unit='mL' ) autorewarded_ts = TimeSeries( name='autorewarded', data=rewards_df.autorewarded.values, timestamps=reward_volume_ts.timestamps, unit='mL' ) rewards_mod = ProcessingModule('rewards', 'Licking behavior processing module') rewards_mod.add_data_interface(reward_volume_ts) rewards_mod.add_data_interface(autorewarded_ts) nwbfile.add_processing_module(rewards_mod) return nwbfile
[docs]def add_image(nwbfile, image_data, image_name, module_name, module_description, image_api=None): description = '{} image at pixels/cm resolution'.format(image_name) if image_api is None: image_api = ImageApi if isinstance(image_data, sitk.Image): data, spacing, unit = ImageApi.deserialize(image_data) elif isinstance(image_data, Image): data = image_data.data spacing = image_data.spacing unit = image_data.unit else: raise ValueError("Not a supported image_data type: {}".format(type(image_data))) assert spacing[0] == spacing[1] and len(spacing) == 2 and unit == 'mm' if module_name not in nwbfile.processing: ophys_mod = ProcessingModule(module_name, module_description) nwbfile.add_processing_module(ophys_mod) else: ophys_mod = nwbfile.processing[module_name] image = GrayscaleImage(image_name, data, resolution=spacing[0] / 10, description=description) if 'images' not in ophys_mod.containers: images = Images(name='images') ophys_mod.add_data_interface(images) else: images = ophys_mod['images'] images.add_image(image) return nwbfile
[docs]def add_max_projection(nwbfile, max_projection, image_api=None): add_image(nwbfile, max_projection, 'max_projection', 'ophys', 'Ophys processing module', image_api=image_api)
[docs]def add_average_image(nwbfile, average_image, image_api=None): add_image(nwbfile, average_image, 'average_image', 'ophys', 'Ophys processing module', image_api=image_api)
[docs]def add_segmentation_mask_image(nwbfile, segmentation_mask_image, image_api=None): add_image(nwbfile, segmentation_mask_image, 'segmentation_mask_image', 'ophys', 'Ophys processing module', image_api=image_api)
[docs]def add_stimulus_index(nwbfile, stimulus_index, nwb_template): image_index = IndexSeries( name=nwb_template.name, data=stimulus_index['image_index'].values, unit='None', indexed_timeseries=nwb_template, timestamps=stimulus_index['start_time'].values) nwbfile.add_stimulus(image_index)
[docs]def add_metadata(nwbfile, metadata: dict, behavior_only: bool): # Rename or reformat incoming metadata fields to conform with pynwb fields tmp_metadata = metadata.copy() tmp_metadata["subject_id"] = tmp_metadata.pop("LabTracks_ID") tmp_metadata["genotype"] = tmp_metadata.pop("full_genotype") if not behavior_only: imaging_plane_group = metadata["imaging_plane_group"] if imaging_plane_group is None: tmp_metadata["imaging_plane_group"] = -1 else: tmp_metadata["imaging_plane_group"] = imaging_plane_group metadata_clean = CompleteOphysBehaviorMetadataSchema().dump(tmp_metadata) # Subject related metadata should be saved to our BehaviorSubject # (augmented pyNWB 'Subject') NWB class subject_fields = {"age", "driver_line", "genotype", "subject_id", "reporter_line", "sex"} subject_metadata = {k: v for k, v in metadata_clean.items() if k in subject_fields} for subject_key in subject_metadata.keys(): metadata_clean.pop(subject_key, None) BehaviorSubject = load_pynwb_extension(SubjectMetadataSchema, 'ndx-aibs-behavior-ophys') nwb_subject = BehaviorSubject( description="A visual behavior subject with a LabTracks ID", age=subject_metadata["age"], driver_line=subject_metadata["driver_line"], genotype=subject_metadata["genotype"], subject_id=str(subject_metadata["subject_id"]), reporter_line=subject_metadata["reporter_line"], sex=subject_metadata["sex"], species='Mus musculus') nwbfile.subject = nwb_subject # Remove metadata that will go into pyNWB base classes for key in OphysBehaviorMetadataSchema.neurodata_skip: metadata_clean.pop(key, None) # Remaining metadata can go into our custom extension new_metadata_dict = {} for key, val in metadata_clean.items(): if isinstance(val, list): new_metadata_dict[key] = np.array(val) elif isinstance(val, (datetime.datetime, uuid.UUID)): new_metadata_dict[key] = str(val) else: new_metadata_dict[key] = val if behavior_only: BehaviorMetadata = load_pynwb_extension(BehaviorMetadataSchema, 'ndx-aibs-behavior-ophys') nwb_metadata = BehaviorMetadata(name='metadata', **new_metadata_dict) else: OphysBehaviorMetadata = load_pynwb_extension( OphysBehaviorMetadataSchema, 'ndx-aibs-behavior-ophys') nwb_metadata = OphysBehaviorMetadata(name='metadata', **new_metadata_dict) nwbfile.add_lab_meta_data(nwb_metadata)
[docs]def add_task_parameters(nwbfile, task_parameters): OphysBehaviorTaskParameters = load_pynwb_extension( BehaviorTaskParametersSchema, 'ndx-aibs-behavior-ophys' ) task_parameters_clean = BehaviorTaskParametersSchema().dump( task_parameters ) new_task_parameters_dict = {} for key, val in task_parameters_clean.items(): if isinstance(val, list): new_task_parameters_dict[key] = np.array(val) else: new_task_parameters_dict[key] = val nwb_task_parameters = OphysBehaviorTaskParameters(name='task_parameters', **new_task_parameters_dict) nwbfile.add_lab_meta_data(nwb_task_parameters)
[docs]def add_cell_specimen_table(nwbfile: NWBFile, cell_specimen_table: pd.DataFrame, session_metadata: dict): """ This function takes the cell specimen table and writes the ROIs contained within. It writes these to a new NWB imaging plane based off the previously supplied metadata Parameters ---------- nwbfile: NWBFile this is the in memory NWBFile currently being written to which ROI data is added cell_specimen_table: pd.DataFrame this is the DataFrame containing the cells segmented from a ophys experiment, stored in json file and loaded. example: /home/nicholasc/projects/allensdk/allensdk/test/ brain_observatory/behavior/cell_specimen_table_789359614.json session_metadata: dict Dictionary containing cell_specimen_table related metadata. Should include at minimum the following fields: "emission_lambda", "excitation_lambda", "indicator", "targeted_structure", and ophys_frame_rate" Returns ------- nwbfile: NWBFile The altered in memory NWBFile object that now has a specimen table """ cell_specimen_metadata = NwbOphysMetadataSchema().load( session_metadata, unknown=marshmallow.EXCLUDE) cell_roi_table = cell_specimen_table.reset_index().set_index('cell_roi_id') # Device: device_name: str = nwbfile.lab_meta_data['metadata'].rig_name if device_name.startswith("MESO"): device_config = { "name": device_name, "description": "Allen Brain Observatory - Mesoscope 2P Rig" } else: device_config = { "name": device_name, "description": "Allen Brain Observatory - Scientifica 2P Rig", "manufacturer": "Scientifica" } nwbfile.create_device(**device_config) device = nwbfile.get_device(device_name) # FOV: fov_width = nwbfile.lab_meta_data['metadata'].field_of_view_width fov_height = nwbfile.lab_meta_data['metadata'].field_of_view_height imaging_plane_description = "{} field of view in {} at depth {} um".format( (fov_width, fov_height), cell_specimen_metadata['targeted_structure'], nwbfile.lab_meta_data['metadata'].imaging_depth) # Optical Channel: optical_channel = OpticalChannel( name='channel_1', description='2P Optical Channel', emission_lambda=cell_specimen_metadata['emission_lambda']) # Imaging Plane: imaging_plane = nwbfile.create_imaging_plane( name='imaging_plane_1', optical_channel=optical_channel, description=imaging_plane_description, device=device, excitation_lambda=cell_specimen_metadata['excitation_lambda'], imaging_rate=cell_specimen_metadata['ophys_frame_rate'], indicator=cell_specimen_metadata['indicator'], location=cell_specimen_metadata['targeted_structure']) # Image Segmentation: image_segmentation = ImageSegmentation(name="image_segmentation") if 'ophys' not in nwbfile.processing: ophys_module = ProcessingModule('ophys', 'Ophys processing module') nwbfile.add_processing_module(ophys_module) else: ophys_module = nwbfile.processing['ophys'] ophys_module.add_data_interface(image_segmentation) # Plane Segmentation: plane_segmentation = image_segmentation.create_plane_segmentation( name='cell_specimen_table', description="Segmented rois", imaging_plane=imaging_plane) for col_name in cell_roi_table.columns: # the columns 'roi_mask', 'pixel_mask', and 'voxel_mask' are already defined # in the nwb.ophys::PlaneSegmentation Object if col_name not in ['id', 'mask_matrix', 'roi_mask', 'pixel_mask', 'voxel_mask']: # This builds the columns with name of column and description of column # both equal to the column name in the cell_roi_table plane_segmentation.add_column(col_name, CELL_SPECIMEN_COL_DESCRIPTIONS.get(col_name, "No Description Available")) # go through each roi and add it to the plan segmentation object for cell_roi_id, table_row in cell_roi_table.iterrows(): # NOTE: The 'roi_mask' in this cell_roi_table has already been # processing by the function from # allensdk.brain_observatory.behavior.session_apis.data_io.ophys_lims_api # get_cell_specimen_table() method. As a result, the ROI is stored in # an array that is the same shape as the FULL field of view of the # experiment (e.g. 512 x 512). mask = table_row.pop('roi_mask') csid = table_row.pop('cell_specimen_id') table_row['cell_specimen_id'] = -1 if csid is None else csid table_row['id'] = cell_roi_id plane_segmentation.add_roi(image_mask=mask, **table_row.to_dict()) return nwbfile
[docs]def add_dff_traces(nwbfile, dff_traces, ophys_timestamps): dff_traces = dff_traces.reset_index().set_index('cell_roi_id')[['dff']] ophys_module = nwbfile.processing['ophys'] # trace data in the form of rois x timepoints trace_data = np.array([dff_traces.loc[cell_roi_id].dff for cell_roi_id in dff_traces.index.values]) cell_specimen_table = nwbfile.processing['ophys'].data_interfaces['image_segmentation'].plane_segmentations['cell_specimen_table'] roi_table_region = cell_specimen_table.create_roi_table_region( description="segmented cells labeled by cell_specimen_id", region=slice(len(dff_traces))) # Create/Add dff modules and interfaces: assert dff_traces.index.name == 'cell_roi_id' dff_interface = DfOverF(name='dff') ophys_module.add_data_interface(dff_interface) dff_interface.create_roi_response_series( name='traces', data=trace_data.T, # Should be stored as timepoints x rois unit='NA', rois=roi_table_region, timestamps=ophys_timestamps) return nwbfile
[docs]def add_corrected_fluorescence_traces(nwbfile, corrected_fluorescence_traces): corrected_fluorescence_traces = corrected_fluorescence_traces.reset_index().set_index('cell_roi_id')[['corrected_fluorescence']] # Create/Add corrected_fluorescence_traces modules and interfaces: assert corrected_fluorescence_traces.index.name == 'cell_roi_id' ophys_module = nwbfile.processing['ophys'] # trace data in the form of rois x timepoints f_trace_data = np.array([corrected_fluorescence_traces.loc[cell_roi_id].corrected_fluorescence for cell_roi_id in corrected_fluorescence_traces.index.values]) roi_table_region = nwbfile.processing['ophys'].data_interfaces['dff'].roi_response_series['traces'].rois ophys_timestamps = ophys_module.get_data_interface('dff').roi_response_series['traces'].timestamps f_interface = Fluorescence(name='corrected_fluorescence') ophys_module.add_data_interface(f_interface) f_interface.create_roi_response_series( name='traces', data=f_trace_data.T, # Should be stored as timepoints x rois unit='NA', rois=roi_table_region, timestamps=ophys_timestamps) return nwbfile
[docs]def add_motion_correction(nwbfile, motion_correction): ophys_module = nwbfile.processing['ophys'] ophys_timestamps = ophys_module.get_data_interface('dff').roi_response_series['traces'].timestamps t1 = TimeSeries( name='ophys_motion_correction_x', data=motion_correction['x'].values, timestamps=ophys_timestamps, unit='pixels' ) t2 = TimeSeries( name='ophys_motion_correction_y', data=motion_correction['y'].values, timestamps=ophys_timestamps, unit='pixels' ) ophys_module.add_data_interface(t1) ophys_module.add_data_interface(t2)