Source code for allensdk.internal.pipeline_modules.run_ophys_time_sync

import logging
import argparse
import os
import datetime
import json
from typing import NamedTuple, Optional

import numpy as np
import h5py

import allensdk
from allensdk.internal.core.lims_pipeline_module import PipelineModule
from allensdk.internal.brain_observatory import time_sync as ts
from allensdk.brain_observatory.argschema_utilities import \
    check_write_access_overwrite


[docs]class TimeSyncOutputs(NamedTuple): """ Schema for synchronization outputs """ # unique identifier for the experiment being aligned experiment_id: int # calculated monitor delay (s) stimulus_delay: float # For each data stream, the count of "extra" timestamps (compared to the # number of samples) ophys_delta: int stimulus_delta: int eye_delta: int behavior_delta: int # aligned timestamps for each data stream (s) ophys_times: np.ndarray stimulus_times: np.ndarray eye_times: np.ndarray behavior_times: np.ndarray # for non-ophys data streams, a mapping from samples to corresponding ophys # frames stimulus_alignment: np.ndarray eye_alignment: np.ndarray behavior_alignment: np.ndarray
[docs]class TimeSyncWriter: def __init__( self, output_h5_path: str, output_json_path: Optional[str] = None ): """ Writes synchronization outputs to h5 and (optionally) json. Parameters ---------- output_h5_path : "heavy" outputs (e.g aligned timestamps and ophy frame correspondances) will ONLY be stored here. Lightweight outputs (e.g. stimulus delay) will also be written here as scalars. output_json_path : if provided, lightweight outputs will be written here, along with provenance information, such as the date and allensdk version. """ self.output_h5_path: str = output_h5_path self.output_json_path: Optional[str] = output_json_path
[docs] def validate_paths(self): """ Determines whether we can actually write to the specified paths, allowing for creation of intermediate directories. It is a good idea to run this beore doing any heavy calculations! """ check_write_access_overwrite(self.output_h5_path) if self.output_json_path is not None: check_write_access_overwrite(self.output_json_path)
[docs] def write(self, outputs: TimeSyncOutputs): """ Convenience for writing both an output h5 and (if applicable) an output json. Parameters ---------- outputs : the data to be written """ self.write_output_h5(outputs) if self.output_json_path is not None: self.write_output_json(outputs)
[docs] def write_output_h5(self, outputs): """ Write (mainly) heaviweight data to an h5 file. Parameters ---------- outputs : the data to be written """ os.makedirs(os.path.dirname(self.output_h5_path), exist_ok=True) with h5py.File(self.output_h5_path, "w") as output_h5: output_h5["stimulus_alignment"] = outputs.stimulus_alignment output_h5["eye_tracking_alignment"] = outputs.eye_alignment output_h5["body_camera_alignment"] = outputs.behavior_alignment output_h5["twop_vsync_fall"] = outputs.ophys_times output_h5["ophys_delta"] = outputs.ophys_delta output_h5["stim_delta"] = outputs.stimulus_delta output_h5["stim_delay"] = outputs.stimulus_delay output_h5["eye_delta"] = outputs.eye_delta output_h5["behavior_delta"] = outputs.behavior_delta
[docs] def write_output_json(self, outputs): """ Write lightweight data to a json Parameters ---------- outputs : the data to be written """ os.makedirs(os.path.dirname(self.output_json_path), exist_ok=True) with open(self.output_json_path, "w") as output_json: json.dump({ "allensdk_version": allensdk.__version__, "date": str(datetime.datetime.now()), "experiment_id": outputs.experiment_id, "output_h5_path": self.output_h5_path, "ophys_delta": outputs.ophys_delta, "stim_delta": outputs.stimulus_delta, "stim_delay": outputs.stimulus_delay, "eye_delta": outputs.eye_delta, "behavior_delta": outputs.behavior_delta }, output_json, indent=2)
[docs]def check_stimulus_delay(obt_delay: float, min_delay: float, max_delay: float): """ Raise an exception if the monitor delay is not within specified bounds Parameters ---------- obt_delay : obtained monitor delay (s) min_delay : lower threshold (s) max_delay : upper threshold (s) """ if obt_delay < min_delay or obt_delay > max_delay: raise ValueError( f"calculated monitor delay was {obt_delay:.3f}s " f"(acceptable interval: [{min_delay:.3f}s, " f"{max_delay:.3f}s])" )
[docs]def run_ophys_time_sync( aligner: ts.OphysTimeAligner, experiment_id: int, min_stimulus_delay: float, max_stimulus_delay: float ) -> TimeSyncOutputs: """ Carry out synchronization of timestamps across the data streams of an ophys experiment. Parameters ---------- aligner : drives alignment. See OphysTimeAligner for details of the attributes and properties that must be implemented. experiment_id : unique identifier for the experiment being aligned min_stimulus_delay : reject alignment run (raise a ValueError) if the calculated monitor delay is below this value (s). max_stimulus_delay : reject alignment run (raise a ValueError) if the calculated monitor delay is above this value (s). Returns ------- A TimeSyncOutputs (see definintion for more information) of output parameters and arrays of aligned timestamps. """ stim_times, stim_delta, stim_delay = aligner.corrected_stim_timestamps check_stimulus_delay(stim_delay, min_stimulus_delay, max_stimulus_delay) ophys_times, ophys_delta = aligner.corrected_ophys_timestamps eye_times, eye_delta = aligner.corrected_eye_video_timestamps beh_times, beh_delta = aligner.corrected_behavior_video_timestamps # stim array is index of ophys frame for each stim frame to match to # so len(stim_times) stim_alignment = ts.get_alignment_array(ophys_times, stim_times) # camera arrays are index of camera frame for each ophys frame ... # cam_nwb_creator depends on this so keeping it that way even though # it makes little sense... len(video_times) eye_alignment = ts.get_alignment_array(eye_times, ophys_times, int_method=np.ceil) behavior_alignment = ts.get_alignment_array(beh_times, ophys_times, int_method=np.ceil) return TimeSyncOutputs( experiment_id, stim_delay, ophys_delta, stim_delta, eye_delta, beh_delta, ophys_times, stim_times, eye_times, beh_times, stim_alignment, eye_alignment, behavior_alignment )
[docs]def main(): parser = argparse.ArgumentParser("Generate brain observatory alignment.") parser.add_argument("input_json", type=str, help="path to input json" ) parser.add_argument("output_json", type=str, nargs="?", help="path to which output json will be written" ) parser.add_argument("--log-level", default=logging.DEBUG) parser.add_argument("--min-stimulus-delay", type=float, default=0.0, help="reject results if monitor delay less than this value (s)" ) parser.add_argument("--max-stimulus-delay", type=float, default=0.07, help="reject results if monitor delay greater than this value (s)" ) mod = PipelineModule("Generate brain observatory alignment.", parser) input_data = mod.input_data() writer = TimeSyncWriter(input_data.get("output_file"), mod.args.output_json) writer.validate_paths() aligner = ts.OphysTimeAligner( input_data.get("sync_file"), scanner=input_data.get("scanner", None), dff_file=input_data.get("dff_file", None), stimulus_pkl=input_data.get("stimulus_pkl", None), eye_video=input_data.get("eye_video", None), behavior_video=input_data.get("behavior_video", None), long_stim_threshold=input_data.get( "long_stim_threshold", ts.LONG_STIM_THRESHOLD ) ) outputs = run_ophys_time_sync( aligner, input_data.get("ophys_experiment_id"), mod.args.min_stimulus_delay, mod.args.max_stimulus_delay ) writer.write(outputs)
if __name__ == "__main__": main()