Source code for allensdk.brain_observatory.nwb

import logging
import warnings
from pathlib import Path
from typing import Iterable, Optional

import h5py
import marshmallow
import numpy as np
import pandas as pd
import datetime
import uuid
import SimpleITK as sitk
import pynwb
from pynwb.base import TimeSeries, Images
from pynwb import ProcessingModule, NWBFile
from pynwb.image import GrayscaleImage
from pynwb.ophys import (
    DfOverF, ImageSegmentation, OpticalChannel, Fluorescence)

from allensdk.brain_observatory import dict_to_indexed_array
from allensdk.brain_observatory.behavior.image_api import Image
from allensdk.brain_observatory.behavior.image_api import ImageApi
from allensdk.brain_observatory.behavior.schemas import (
    CompleteOphysBehaviorMetadataSchema, NwbOphysMetadataSchema,
    BehaviorMetadataSchema, OphysBehaviorMetadataSchema,
    BehaviorTaskParametersSchema, SubjectMetadataSchema
)
from allensdk.brain_observatory.nwb.metadata import load_pynwb_extension


log = logging.getLogger("allensdk.brain_observatory.nwb")

CELL_SPECIMEN_COL_DESCRIPTIONS = {
    'cell_specimen_id': 'Unified id of segmented cell across experiments '
                        '(after cell matching)',
    'height': 'Height of ROI in pixels',
    'width': 'Width of ROI in pixels',
    'mask_image_plane': 'Which image plane an ROI resides on. Overlapping '
                        'ROIs are stored on different mask image planes.',
    'max_correction_down': 'Max motion correction in down direction in pixels',
    'max_correction_left': 'Max motion correction in left direction in pixels',
    'max_correction_up': 'Max motion correction in up direction in pixels',
    'max_correction_right': 'Max motion correction in right direction in '
                            'pixels',
    'valid_roi': 'Indicates if cell classification found the ROI to be a cell '
                 'or not',
    'x': 'x position of ROI in Image Plane in pixels (top left corner)',
    'y': 'y position of ROI in Image Plane in pixels (top left corner)'
}


[docs]def check_nwbfile_version(nwbfile_path: str, desired_minimum_version: str, warning_msg: str): with h5py.File(nwbfile_path, 'r') as f: # nwb 2.x files store version as an attribute try: nwb_version = str(f.attrs["nwb_version"]).split(".") except KeyError: # nwb 1.x files store version as dataset try: nwb_version = str(f["nwb_version"][...].astype(str)) # Stored in the form: `NWB-x.y.z` nwb_version = nwb_version.split("-")[1].split(".") except (KeyError, IndexError): nwb_version = None if nwb_version is None: warnings.warn(f"'{nwbfile_path}' doesn't appear to be a valid " f"Neurodata Without Borders (*.nwb) format file as " f"neither a 'nwb_version' field nor dataset could " f"be found!") else: if tuple(nwb_version) < tuple(desired_minimum_version.split(".")): warnings.warn(warning_msg)
[docs]def read_eye_dlc_tracking_ellipses(input_path: Path) -> dict: """Reads eye tracking ellipse fit data from an h5 file. Args: input_path (Path): Path to eye tracking ellipse fit h5 file Returns: dict: Loaded h5 data. Each 'params' field contains dataframes with] ellipse fit parameters. Dataframes contain 5 columns each consisting of: "center_x", "center_y", "height", "phi", "width" """ eye_dlc_tracking_data = {} # TODO: Some ellipses.h5 files have the 'cr' key as complex type instead of # float. For now, when loading ellipses.h5 files, always coerce to float # but this should eventually be resolved upstream... # See: allensdk.brain_observatory.eye_tracking pupil_params = pd.read_hdf(input_path, key="pupil").astype(float) cr_params = pd.read_hdf(input_path, key="cr").astype(float) eye_params = pd.read_hdf(input_path, key="eye").astype(float) eye_dlc_tracking_data["pupil_params"] = pupil_params eye_dlc_tracking_data["cr_params"] = cr_params eye_dlc_tracking_data["eye_params"] = eye_params return eye_dlc_tracking_data
[docs]def read_eye_gaze_mappings(input_path: Path) -> dict: """Reads eye gaze mapping data from an h5 file. Args: input_path (Path): Path to eye gaze mapping h5 data file produced by 'allensdk.brain_observatory.gaze_mapping' module. Returns: dict: Loaded h5 data. *_eye_areas: Area of eye (in pixels^2) over time *_pupil_areas: Area of pupil (in pixels^2) over time *_screen_coordinates: y, x screen coordinates (in cm) over time *_screen_coordinates_spherical: y, x screen coordinates (in deg) over time synced_frame_timestamps: synced timestamps for video frames (in sec) """ eye_gaze_data = {} eye_gaze_data["raw_eye_areas"] = \ pd.read_hdf(input_path, key="raw_eye_areas") eye_gaze_data["raw_pupil_areas"] = \ pd.read_hdf(input_path, key="raw_pupil_areas") eye_gaze_data["raw_screen_coordinates"] = \ pd.read_hdf(input_path, key="raw_screen_coordinates") eye_gaze_data["raw_screen_coordinates_spherical"] = \ pd.read_hdf(input_path, key="raw_screen_coordinates_spherical") eye_gaze_data["new_eye_areas"] = \ pd.read_hdf(input_path, key="new_eye_areas") eye_gaze_data["new_pupil_areas"] = \ pd.read_hdf(input_path, key="new_pupil_areas") eye_gaze_data["new_screen_coordinates"] = \ pd.read_hdf(input_path, key="new_screen_coordinates") eye_gaze_data["new_screen_coordinates_spherical"] = \ pd.read_hdf(input_path, key="new_screen_coordinates_spherical") eye_gaze_data["synced_frame_timestamps"] = \ pd.read_hdf(input_path, key="synced_frame_timestamps") return eye_gaze_data
[docs]def create_eye_gaze_mapping_dataframe(eye_gaze_data: dict) -> pd.DataFrame: eye_gaze_mapping_df = pd.DataFrame({ "raw_eye_area": eye_gaze_data["raw_eye_areas"].values, "raw_pupil_area": eye_gaze_data["raw_pupil_areas"].values, "raw_screen_coordinates_x_cm": eye_gaze_data["raw_screen_coordinates"]["x_pos_cm"].values, "raw_screen_coordinates_y_cm": eye_gaze_data["raw_screen_coordinates"]["y_pos_cm"].values, "raw_screen_coordinates_spherical_x_deg": eye_gaze_data["raw_screen_coordinates_spherical"]["x_pos_deg"].values, "raw_screen_coordinates_spherical_y_deg": eye_gaze_data["raw_screen_coordinates_spherical"]["y_pos_deg"].values, "filtered_eye_area": eye_gaze_data["new_eye_areas"].values, "filtered_pupil_area": eye_gaze_data["new_pupil_areas"].values, "filtered_screen_coordinates_x_cm": eye_gaze_data["new_screen_coordinates"]["x_pos_cm"].values, "filtered_screen_coordinates_y_cm": eye_gaze_data["new_screen_coordinates"]["y_pos_cm"].values, "filtered_screen_coordinates_spherical_x_deg": eye_gaze_data["new_screen_coordinates_spherical"]["x_pos_deg"].values, "filtered_screen_coordinates_spherical_y_deg": eye_gaze_data["new_screen_coordinates_spherical"]["y_pos_deg"].values }, index=eye_gaze_data["synced_frame_timestamps"].values ) return eye_gaze_mapping_df
[docs]def eye_tracking_data_is_valid(eye_dlc_tracking_data: dict, synced_timestamps: pd.Series) -> bool: is_valid = True pupil_params = eye_dlc_tracking_data["pupil_params"] cr_params = eye_dlc_tracking_data["cr_params"] eye_params = eye_dlc_tracking_data["eye_params"] num_frames_match = ((pupil_params.shape[0] == cr_params.shape[0]) and (cr_params.shape[0] == eye_params.shape[0])) if not num_frames_match: log.warn("The number of frames for ellipse fits don't " "match when they should. No ellipse fits will be written! " f"pupil_params ({pupil_params.shape[0]}), " f"cr_params ({cr_params.shape[0]}), " f"eye_params ({eye_params.shape[0]})") is_valid = False if (pupil_params.shape[0] != len(synced_timestamps)): log.warn("The number of camera sync pulses in the " f"sync file ({len(synced_timestamps)}) do not match " "with the number of eye tracking frames " f"({pupil_params.shape[0]})! No ellipse fits will be " "written!") is_valid = False return is_valid
[docs]def create_eye_tracking_nwb_processing_module(eye_dlc_tracking_data: dict, synced_timestamps: pd.Series ) -> pynwb.ProcessingModule: # Top level container for eye tracking processed data eye_tracking_mod = pynwb.ProcessingModule( name='eye_tracking', description='Eye tracking processing module') # Data interfaces of dlc_fits_container pupil_fits = eye_dlc_tracking_data["pupil_params"].assign( timestamps=synced_timestamps) pupil_params = pynwb.core.DynamicTable.from_dataframe( df=pupil_fits, name="pupil_ellipse_fits") cr_fits = eye_dlc_tracking_data["cr_params"].assign( timestamps=synced_timestamps) cr_params = pynwb.core.DynamicTable.from_dataframe(df=cr_fits, name="cr_ellipse_fits") eye_fits = eye_dlc_tracking_data["eye_params"].assign( timestamps=synced_timestamps) eye_params = pynwb.core.DynamicTable.from_dataframe( df=eye_fits, name="eye_ellipse_fits") eye_tracking_mod.add_data_interface(pupil_params) eye_tracking_mod.add_data_interface(cr_params) eye_tracking_mod.add_data_interface(eye_params) return eye_tracking_mod
[docs]def add_eye_gaze_data_interfaces(pynwb_container: pynwb.NWBContainer, pupil_areas: pd.Series, eye_areas: pd.Series, screen_coordinates: pd.DataFrame, screen_coordinates_spherical: pd.DataFrame, synced_timestamps: pd.Series ) -> pynwb.NWBContainer: pupil_area_ts = pynwb.base.TimeSeries( name="pupil_area", data=pupil_areas.values, timestamps=synced_timestamps.values, unit="Pixels ^ 2" ) eye_area_ts = pynwb.base.TimeSeries( name="eye_area", data=eye_areas.values, timestamps=synced_timestamps.values, unit="Pixels ^ 2" ) screen_coord_ts = pynwb.base.TimeSeries( name="screen_coordinates", data=screen_coordinates.values, timestamps=synced_timestamps.values, unit="Centimeters" ) screen_coord_spherical_ts = pynwb.base.TimeSeries( name="screen_coordinates_spherical", data=screen_coordinates_spherical.values, timestamps=synced_timestamps.values, unit="Degrees" ) pynwb_container.add_data_interface(pupil_area_ts) pynwb_container.add_data_interface(eye_area_ts) pynwb_container.add_data_interface(screen_coord_ts) pynwb_container.add_data_interface(screen_coord_spherical_ts) return pynwb_container
[docs]def create_gaze_mapping_nwb_processing_modules(eye_gaze_data: dict): # Container for raw gaze mapped data raw_gaze_mapping_mod = pynwb.ProcessingModule( name='raw_gaze_mapping', description='Gaze mapping processing module raw outputs') raw_gaze_mapping_mod = add_eye_gaze_data_interfaces( raw_gaze_mapping_mod, pupil_areas=eye_gaze_data["raw_pupil_areas"], eye_areas=eye_gaze_data["raw_eye_areas"], screen_coordinates=eye_gaze_data["raw_screen_coordinates"], screen_coordinates_spherical=eye_gaze_data["raw_screen_coordinates_spherical"], # noqa: E501 synced_timestamps=eye_gaze_data["synced_frame_timestamps"]) # Container for filtered gaze mapped data filt_gaze_mapping_mod = pynwb.ProcessingModule( name='filtered_gaze_mapping', description='Gaze mapping processing module filtered outputs') filt_gaze_mapping_mod = add_eye_gaze_data_interfaces( filt_gaze_mapping_mod, pupil_areas=eye_gaze_data["new_pupil_areas"], eye_areas=eye_gaze_data["new_eye_areas"], screen_coordinates=eye_gaze_data["new_screen_coordinates"], screen_coordinates_spherical=eye_gaze_data["new_screen_coordinates_spherical"], # noqa: E501 synced_timestamps=eye_gaze_data["synced_frame_timestamps"]) return (raw_gaze_mapping_mod, filt_gaze_mapping_mod)
[docs]def add_eye_tracking_ellipse_fit_data_to_nwbfile(nwbfile: pynwb.NWBFile, eye_dlc_tracking_data: dict, synced_timestamps: pd.Series ) -> pynwb.NWBFile: eye_tracking_mod = create_eye_tracking_nwb_processing_module( eye_dlc_tracking_data, synced_timestamps) nwbfile.add_processing_module(eye_tracking_mod) return nwbfile
[docs]def add_eye_gaze_mapping_data_to_nwbfile(nwbfile: pynwb.NWBFile, eye_gaze_data: dict) -> pynwb.NWBFile: raw_gaze_mapping_mod, filt_gaze_mapping_mod = \ create_gaze_mapping_nwb_processing_modules(eye_gaze_data) nwbfile.add_processing_module(raw_gaze_mapping_mod) nwbfile.add_processing_module(filt_gaze_mapping_mod) return nwbfile
[docs]def add_running_acquisition_to_nwbfile(nwbfile, running_acquisition_df: pd.DataFrame): running_dx_series = TimeSeries( name='dx', data=running_acquisition_df['dx'].values, timestamps=running_acquisition_df.index.values, unit='cm', description=( 'Running wheel angular change, computed during data collection') ) v_sig = TimeSeries( name='v_sig', data=running_acquisition_df['v_sig'].values, timestamps=running_acquisition_df.index.values, unit='V', description='Voltage signal from the running wheel encoder' ) v_in = TimeSeries( name='v_in', data=running_acquisition_df['v_in'].values, timestamps=running_acquisition_df.index.values, unit='V', description=( 'The theoretical maximum voltage that the running wheel encoder ' 'will reach prior to "wrapping". This should ' 'theoretically be 5V (after crossing 5V goes to 0V, or ' 'vice versa). In practice the encoder does not always ' 'reach this value before wrapping, which can cause ' 'transient spikes in speed at the voltage "wraps".') ) if 'running' in nwbfile.processing: running_mod = nwbfile.processing['running'] else: running_mod = ProcessingModule('running', 'Running speed processing module') nwbfile.add_processing_module(running_mod) running_mod.add_data_interface(running_dx_series) nwbfile.add_acquisition(v_sig) nwbfile.add_acquisition(v_in) return nwbfile
[docs]def add_running_speed_to_nwbfile(nwbfile, running_speed, name='speed', unit='cm/s', from_dataframe=False): ''' Adds running speed data to an NWBFile as a timeseries in acquisition Parameters ---------- nwbfile : pynwb.NWBFile File to which running speeds will be written running_speed : Union[RunningSpeed, pd.DataFrame] Either a RunningSpeed object or pandas DataFrame. Contains attributes 'values' and 'timestamps' name : str, optional Used as name of timeseries object unit : str, optional SI units of running speed values from_dataframe : bool, optional Whether `running_speed` is a dataframe or not. Default is False. Returns ------- nwbfile : pynwb.NWBFile ''' if from_dataframe: data = running_speed['speed'].values timestamps = running_speed['timestamps'].values else: data = running_speed.values timestamps = running_speed.timestamps running_speed_series = pynwb.base.TimeSeries( name=name, data=data, timestamps=timestamps, unit=unit) if 'running' in nwbfile.processing: running_mod = nwbfile.processing['running'] else: running_mod = ProcessingModule('running', 'Running speed processing module') nwbfile.add_processing_module(running_mod) running_mod.add_data_interface(running_speed_series) return nwbfile
[docs]def create_stimulus_presentation_time_interval( name: str, description: str, columns_to_add: Iterable) -> pynwb.epoch.TimeIntervals: column_descriptions = { "stimulus_name": "Name of stimulus", "stimulus_block": ("Index of contiguous presentations of " "one stimulus type"), "temporal_frequency": "Temporal frequency of stimulus", "x_position": "Horizontal position of stimulus on screen", "y_position": "Vertical position of stimulus on screen", "mask": "Shape of mask applied to stimulus", "opacity": "Opacity of stimulus", "phase": "Phase of grating stimulus", "size": "Size of stimulus (see ‘units’ field for units)", "units": "Units of stimulus size", "stimulus_index": "Index of stimulus type", "orientation": "Orientation of stimulus", "spatial_frequency": "Spatial frequency of stimulus", "frame": "Frame of movie stimulus", "contrast": "Contrast of stimulus", "Speed": "Speed of moving dot field", "Dir": "Direction of stimulus motion", "coherence": "Coherence of moving dot field", "dotLife": "Longevity of individual dots", "dotSize": "Size of individual dots", "fieldPos": "Position of moving dot field", "fieldShape": "Shape of moving dot field", "fieldSize": "Size of moving dot field", "nDots": "Number of dots in moving dot field" } columns_to_ignore = {'start_time', 'stop_time', 'tags', 'timeseries'} interval = pynwb.epoch.TimeIntervals(name=name, description=description) for column_name in columns_to_add: if column_name not in columns_to_ignore: description = column_descriptions.get( column_name, "No description") interval.add_column(name=column_name, description=description) return interval
[docs]def add_invalid_times(nwbfile, epochs): """ Write invalid times to nwbfile if epochs are not empty Parameters ---------- nwbfile: pynwb.NWBFile epochs: list of dicts records of invalid epochs Returns ------- pynwb.NWBFile """ table = setup_table_for_invalid_times(epochs) if not table.empty: container = pynwb.epoch.TimeIntervals('invalid_times') for index, row in table.iterrows(): container.add_interval(start_time=row['start_time'], stop_time=row['stop_time'], tags=row['tags'], ) nwbfile.invalid_times = container return nwbfile
[docs]def setup_table_for_invalid_times(invalid_epochs): """ Create table with invalid times if invalid_epochs are present Parameters ---------- invalid_epochs: list of dicts of invalid epoch records Returns ------- pd.DataFrame of invalid times if epochs are not empty, otherwise return None """ if invalid_epochs: df = pd.DataFrame.from_dict(invalid_epochs) start_time = df['start_time'].values stop_time = df['end_time'].values tags = [[_type, str(_id), label] for _type, _id, label in zip(df['type'], df['id'], df['label'])] table = pd.DataFrame({'start_time': start_time, 'stop_time': stop_time, 'tags': tags} ) table.index.name = 'id' else: table = pd.DataFrame() return table
[docs]def setup_table_for_epochs(table, timeseries, tag): table = table.copy() indices = np.searchsorted(timeseries.timestamps[:], table['start_time'].values) if len(indices > 0): diffs = np.concatenate([np.diff(indices), [table.shape[0] - indices[-1]]]) else: diffs = [] table['tags'] = [(tag,)] * table.shape[0] table['timeseries'] = [[[indices[ii], diffs[ii], timeseries]] for ii in range(table.shape[0])] return table
[docs]def add_stimulus_timestamps(nwbfile, stimulus_timestamps, module_name='stimulus'): stimulus_ts = TimeSeries( data=stimulus_timestamps, name='timestamps', timestamps=stimulus_timestamps, unit='s' ) stim_mod = ProcessingModule(module_name, 'Stimulus Times processing') nwbfile.add_processing_module(stim_mod) stim_mod.add_data_interface(stimulus_ts) return nwbfile
[docs]def add_trials(nwbfile, trials, description_dict={}): order = list(trials.index) for _, row in trials[['start_time', 'stop_time']].iterrows(): row_dict = row.to_dict() nwbfile.add_trial(**row_dict) for c in trials.columns: if c in ['start_time', 'stop_time']: continue index, data = dict_to_indexed_array(trials[c].to_dict(), order) if data.dtype == '<U1': # data type is composed of unicode characters data = trials[c].tolist() if not len(data) == len(order): if len(data) == 0: data = [''] nwbfile.add_trial_column( name=c, description=description_dict.get( c, 'NOT IMPLEMENTED: %s' % c), data=data, index=index) else: nwbfile.add_trial_column( name=c, description=description_dict.get( c, 'NOT IMPLEMENTED: %s' % c), data=data)
[docs]def add_licks(nwbfile, licks): lick_timeseries = TimeSeries( name='licks', data=licks.frame.values, timestamps=licks.timestamps.values, description=('Timestamps and stimulus presentation ' 'frame indices for lick events'), unit='N/A' ) # Add lick interface to nwb file, by way of a processing module: licks_mod = ProcessingModule('licking', 'Licking behavior processing module') licks_mod.add_data_interface(lick_timeseries) nwbfile.add_processing_module(licks_mod) return nwbfile
[docs]def add_rewards(nwbfile, rewards_df): reward_volume_ts = TimeSeries( name='volume', data=rewards_df.volume.values, timestamps=rewards_df['timestamps'].values, unit='mL' ) autorewarded_ts = TimeSeries( name='autorewarded', data=rewards_df.autorewarded.values, timestamps=reward_volume_ts.timestamps, unit='mL' ) rewards_mod = ProcessingModule('rewards', 'Licking behavior processing module') rewards_mod.add_data_interface(reward_volume_ts) rewards_mod.add_data_interface(autorewarded_ts) nwbfile.add_processing_module(rewards_mod) return nwbfile
[docs]def add_image(nwbfile, image_data, image_name, module_name, module_description, image_api=None): description = '{} image at pixels/cm resolution'.format(image_name) if image_api is None: image_api = ImageApi if isinstance(image_data, sitk.Image): data, spacing, unit = ImageApi.deserialize(image_data) elif isinstance(image_data, Image): data = image_data.data spacing = image_data.spacing unit = image_data.unit else: raise ValueError("Not a supported image_data type: " f"{type(image_data)}") assert spacing[0] == spacing[1] and len(spacing) == 2 and unit == 'mm' if module_name not in nwbfile.processing: ophys_mod = ProcessingModule(module_name, module_description) nwbfile.add_processing_module(ophys_mod) else: ophys_mod = nwbfile.processing[module_name] image = GrayscaleImage(image_name, data, resolution=spacing[0] / 10, description=description) if 'images' not in ophys_mod.containers: images = Images(name='images') ophys_mod.add_data_interface(images) else: images = ophys_mod['images'] images.add_image(image) return nwbfile
[docs]def add_max_projection(nwbfile, max_projection, image_api=None): add_image(nwbfile, max_projection, 'max_projection', 'ophys', 'Ophys processing module', image_api=image_api)
[docs]def add_average_image(nwbfile, average_image, image_api=None): add_image(nwbfile, average_image, 'average_image', 'ophys', 'Ophys processing module', image_api=image_api)
[docs]def add_segmentation_mask_image(nwbfile, segmentation_mask_image, image_api=None): add_image(nwbfile, segmentation_mask_image, 'segmentation_mask_image', 'ophys', 'Ophys processing module', image_api=image_api)
[docs]def add_metadata(nwbfile, metadata: dict, behavior_only: bool): # Rename or reformat incoming metadata fields to conform with pynwb fields tmp_metadata = metadata.copy() tmp_metadata["subject_id"] = tmp_metadata.pop("mouse_id") tmp_metadata["genotype"] = tmp_metadata.pop("full_genotype") if not behavior_only: imaging_plane_group = metadata["imaging_plane_group"] if imaging_plane_group is None: tmp_metadata["imaging_plane_group"] = -1 else: tmp_metadata["imaging_plane_group"] = imaging_plane_group metadata_clean = CompleteOphysBehaviorMetadataSchema().dump(tmp_metadata) # Subject related metadata should be saved to our BehaviorSubject # (augmented pyNWB 'Subject') NWB class subject_fields = {"age_in_days", "driver_line", "genotype", "subject_id", "reporter_line", "sex"} subject_metadata = {k: v for k, v in metadata_clean.items() if k in subject_fields} for subject_key in subject_metadata.keys(): metadata_clean.pop(subject_key, None) BehaviorSubject = load_pynwb_extension(SubjectMetadataSchema, 'ndx-aibs-behavior-ophys') def _get_age(age_in_days: Optional[int]) -> Optional[str]: """Convert numeric age_in_days to ISO 8601""" if age_in_days is None: return 'null' return f'P{age_in_days}D' nwb_subject = BehaviorSubject( description="A visual behavior subject with a LabTracks ID", age=_get_age(age_in_days=subject_metadata['age_in_days']), driver_line=subject_metadata["driver_line"], genotype=subject_metadata["genotype"], subject_id=str(subject_metadata["subject_id"]), reporter_line=subject_metadata["reporter_line"], sex=subject_metadata["sex"], species='Mus musculus') nwbfile.subject = nwb_subject # Remove metadata that will go into pyNWB base classes for key in OphysBehaviorMetadataSchema.neurodata_skip: metadata_clean.pop(key, None) # Remaining metadata can go into our custom extension new_metadata_dict = {} for key, val in metadata_clean.items(): if isinstance(val, list): new_metadata_dict[key] = np.array(val) elif isinstance(val, (datetime.datetime, uuid.UUID)): new_metadata_dict[key] = str(val) else: new_metadata_dict[key] = val if behavior_only: BehaviorMetadata = load_pynwb_extension(BehaviorMetadataSchema, 'ndx-aibs-behavior-ophys') nwb_metadata = BehaviorMetadata(name='metadata', **new_metadata_dict) else: OphysBehaviorMetadata = load_pynwb_extension( OphysBehaviorMetadataSchema, 'ndx-aibs-behavior-ophys') nwb_metadata = OphysBehaviorMetadata(name='metadata', **new_metadata_dict) nwbfile.add_lab_meta_data(nwb_metadata)
[docs]def add_task_parameters(nwbfile, task_parameters): OphysBehaviorTaskParameters = load_pynwb_extension( BehaviorTaskParametersSchema, 'ndx-aibs-behavior-ophys' ) task_parameters_clean = BehaviorTaskParametersSchema().dump( task_parameters ) new_task_parameters_dict = {} for key, val in task_parameters_clean.items(): if isinstance(val, list): new_task_parameters_dict[key] = np.array(val) else: new_task_parameters_dict[key] = val nwb_task_parameters = OphysBehaviorTaskParameters( name='task_parameters', **new_task_parameters_dict) nwbfile.add_lab_meta_data(nwb_task_parameters)
[docs]def add_cell_specimen_table(nwbfile: NWBFile, cell_specimen_table: pd.DataFrame, session_metadata: dict): """ This function takes the cell specimen table and writes the ROIs contained within. It writes these to a new NWB imaging plane based off the previously supplied metadata Parameters ---------- nwbfile: NWBFile this is the in memory NWBFile currently being written to which ROI data is added cell_specimen_table: pd.DataFrame this is the DataFrame containing the cells segmented from a ophys experiment, stored in json file and loaded. example: /home/nicholasc/projects/allensdk/allensdk/test/ brain_observatory/behavior/cell_specimen_table_789359614.json session_metadata: dict Dictionary containing cell_specimen_table related metadata. Should include at minimum the following fields: "emission_lambda", "excitation_lambda", "indicator", "targeted_structure", and ophys_frame_rate" Returns ------- nwbfile: NWBFile The altered in memory NWBFile object that now has a specimen table """ cell_specimen_metadata = NwbOphysMetadataSchema().load( session_metadata, unknown=marshmallow.EXCLUDE) cell_roi_table = cell_specimen_table.reset_index().set_index('cell_roi_id') # Device: device_name: str = nwbfile.lab_meta_data['metadata'].equipment_name if device_name.startswith("MESO"): device_config = { "name": device_name, "description": "Allen Brain Observatory - Mesoscope 2P Rig" } else: device_config = { "name": device_name, "description": "Allen Brain Observatory - Scientifica 2P Rig", "manufacturer": "Scientifica" } nwbfile.create_device(**device_config) device = nwbfile.get_device(device_name) # FOV: fov_width = nwbfile.lab_meta_data['metadata'].field_of_view_width fov_height = nwbfile.lab_meta_data['metadata'].field_of_view_height imaging_plane_description = "{} field of view in {} at depth {} um".format( (fov_width, fov_height), cell_specimen_metadata['targeted_structure'], nwbfile.lab_meta_data['metadata'].imaging_depth) # Optical Channel: optical_channel = OpticalChannel( name='channel_1', description='2P Optical Channel', emission_lambda=cell_specimen_metadata['emission_lambda']) # Imaging Plane: imaging_plane = nwbfile.create_imaging_plane( name='imaging_plane_1', optical_channel=optical_channel, description=imaging_plane_description, device=device, excitation_lambda=cell_specimen_metadata['excitation_lambda'], imaging_rate=cell_specimen_metadata['ophys_frame_rate'], indicator=cell_specimen_metadata['indicator'], location=cell_specimen_metadata['targeted_structure']) # Image Segmentation: image_segmentation = ImageSegmentation(name="image_segmentation") if 'ophys' not in nwbfile.processing: ophys_module = ProcessingModule('ophys', 'Ophys processing module') nwbfile.add_processing_module(ophys_module) else: ophys_module = nwbfile.processing['ophys'] ophys_module.add_data_interface(image_segmentation) # Plane Segmentation: plane_segmentation = image_segmentation.create_plane_segmentation( name='cell_specimen_table', description="Segmented rois", imaging_plane=imaging_plane) for col_name in cell_roi_table.columns: # the columns 'roi_mask', 'pixel_mask', and 'voxel_mask' are # already defined in the nwb.ophys::PlaneSegmentation Object if col_name not in ['id', 'mask_matrix', 'roi_mask', 'pixel_mask', 'voxel_mask']: # This builds the columns with name of column and description # of column both equal to the column name in the cell_roi_table plane_segmentation.add_column( col_name, CELL_SPECIMEN_COL_DESCRIPTIONS.get( col_name, "No Description Available")) # go through each roi and add it to the plan segmentation object for cell_roi_id, table_row in cell_roi_table.iterrows(): # NOTE: The 'roi_mask' in this cell_roi_table has already been # processing by the function from # allensdk.brain_observatory.behavior.session_apis.data_io.ophys_lims_api # get_cell_specimen_table() method. As a result, the ROI is stored in # an array that is the same shape as the FULL field of view of the # experiment (e.g. 512 x 512). mask = table_row.pop('roi_mask') csid = table_row.pop('cell_specimen_id') table_row['cell_specimen_id'] = -1 if csid is None else csid table_row['id'] = cell_roi_id plane_segmentation.add_roi(image_mask=mask, **table_row.to_dict()) return nwbfile
[docs]def add_dff_traces(nwbfile, dff_traces, ophys_timestamps): dff_traces = dff_traces.reset_index().set_index('cell_roi_id')[['dff']] ophys_module = nwbfile.processing['ophys'] # trace data in the form of rois x timepoints trace_data = np.array([dff_traces.loc[cell_roi_id].dff for cell_roi_id in dff_traces.index.values]) cell_specimen_table = nwbfile.processing['ophys'].data_interfaces['image_segmentation'].plane_segmentations['cell_specimen_table'] # noqa: E501 roi_table_region = cell_specimen_table.create_roi_table_region( description="segmented cells labeled by cell_specimen_id", region=slice(len(dff_traces))) # Create/Add dff modules and interfaces: assert dff_traces.index.name == 'cell_roi_id' dff_interface = DfOverF(name='dff') ophys_module.add_data_interface(dff_interface) dff_interface.create_roi_response_series( name='traces', data=trace_data.T, # Should be stored as timepoints x rois unit='NA', rois=roi_table_region, timestamps=ophys_timestamps) return nwbfile
[docs]def add_corrected_fluorescence_traces(nwbfile, corrected_fluorescence_traces): corrected_fluorescence_traces = \ corrected_fluorescence_traces.reset_index().set_index( 'cell_roi_id')[['corrected_fluorescence']] # Create/Add corrected_fluorescence_traces modules and interfaces: assert corrected_fluorescence_traces.index.name == 'cell_roi_id' ophys_module = nwbfile.processing['ophys'] # trace data in the form of rois x timepoints f_trace_data = np.array( [corrected_fluorescence_traces.loc[cell_roi_id].corrected_fluorescence for cell_roi_id in corrected_fluorescence_traces.index.values]) roi_table_region = nwbfile.processing['ophys'].data_interfaces['dff'].roi_response_series['traces'].rois # noqa: E501 ophys_timestamps = ophys_module.get_data_interface( 'dff').roi_response_series['traces'].timestamps f_interface = Fluorescence(name='corrected_fluorescence') ophys_module.add_data_interface(f_interface) f_interface.create_roi_response_series( name='traces', data=f_trace_data.T, # Should be stored as timepoints x rois unit='NA', rois=roi_table_region, timestamps=ophys_timestamps) return nwbfile
[docs]def add_motion_correction(nwbfile, motion_correction): ophys_module = nwbfile.processing['ophys'] ophys_timestamps = ophys_module.get_data_interface( 'dff').roi_response_series['traces'].timestamps t1 = TimeSeries( name='ophys_motion_correction_x', data=motion_correction['x'].values, timestamps=ophys_timestamps, unit='pixels' ) t2 = TimeSeries( name='ophys_motion_correction_y', data=motion_correction['y'].values, timestamps=ophys_timestamps, unit='pixels' ) ophys_module.add_data_interface(t1) ophys_module.add_data_interface(t2)