Source code for allensdk.brain_observatory.ecephys.file_io.ecephys_sync_dataset

import functools
from collections import defaultdict
import logging
import warnings

import numpy as np

from allensdk.brain_observatory.sync_dataset import Dataset
from allensdk.brain_observatory.ecephys import stimulus_sync


[docs]class EcephysSyncDataset(Dataset): @property def sample_frequency(self): return self.meta_data['ni_daq']['counter_output_freq'] @sample_frequency.setter def sample_frequency(self, value): if not hasattr(self, 'meta_data'): self.meta_data = defaultdict(dict) self.meta_data['ni_daq']['counter_output_freq'] = value def __init__(self): '''In-memory representation of a sync h5 file as produced by the sync package. Notes ----- base is from http://aibspi/mpe_apps/sync/blob/master/sync/dataset.py Construction works slightly differently for this class as its base. In particular, this class' __init__ method merely constructs the object. To make a new SyncDataset in client code, use the factory classmethod. This is done for ease of testability. ''' pass
[docs] def extract_led_times(self, keys=Dataset.OPTOGENETIC_STIMULATION_KEYS, fallback_line=18): try: led_times = self.get_edges( kind="rising", keys=keys, units="seconds" ) except KeyError: warnings.warn("unable to find LED times using line labels" + f"{keys}, returning line {fallback_line}") led_times = self.get_rising_edges(fallback_line, units="seconds") return led_times
[docs] def remove_zero_frames(self, frame_times): D = np.diff(frame_times) a = np.where(D < 0.01)[0] b = np.where((D > 0.018) * (D < 0.1))[0] def find_match(b, value): try: return b[np.max(np.where((b < value))[0])] - value except ValueError: return None c = [find_match(b, A) for A in a] ft = np.copy(D) for idx, d in enumerate(a): if c[idx] is not None: if c[idx] > -100: ft[d+c[idx]] = np.median(D) ft[d] = np.median(D) t = np.concatenate(([np.min(frame_times)], np.cumsum(ft) + np.min(frame_times))) return t
[docs] def extract_frame_times_from_photodiode( self, photodiode_cycle=60, frame_keys=Dataset.FRAME_KEYS, photodiode_keys=Dataset.PHOTODIODE_KEYS, trim_discontiguous_frame_times=True): photodiode_times = self.get_edges('all', photodiode_keys) vsync_times = self.get_edges('falling', frame_keys) if trim_discontiguous_frame_times: vsync_times = stimulus_sync.trim_discontiguous_vsyncs(vsync_times) vsync_times_chunked, pd_times_chunked = \ stimulus_sync.separate_vsyncs_and_photodiode_times( vsync_times, photodiode_times, photodiode_cycle) logging.info(f"Total chunks: {len(vsync_times_chunked)}") frame_start_times = np.zeros((0,)) for i in range(len(vsync_times_chunked)): photodiode_times = stimulus_sync.trim_border_pulses( pd_times_chunked[i], vsync_times_chunked[i]) photodiode_times = stimulus_sync.correct_on_off_effects( photodiode_times) photodiode_times = stimulus_sync.fix_unexpected_edges( photodiode_times, cycle=photodiode_cycle) frame_duration = stimulus_sync.estimate_frame_duration( photodiode_times, cycle=photodiode_cycle) irregular_interval_policy = functools.partial( stimulus_sync.allocate_by_vsync, np.diff(vsync_times_chunked[i])) frame_indices, frame_starts, frame_end_times = \ stimulus_sync.compute_frame_times( photodiode_times, frame_duration, len(vsync_times_chunked[i]), cycle=photodiode_cycle, irregular_interval_policy=irregular_interval_policy ) frame_start_times = np.concatenate((frame_start_times, frame_starts)) frame_start_times = self.remove_zero_frames(frame_start_times) logging.info(f"Total vsyncs: {len(vsync_times)}") return frame_start_times
[docs] def extract_frame_times_from_vsyncs( self, photodiode_cycle=60, frame_keys=Dataset.FRAME_KEYS, photodiode_keys=Dataset.PHOTODIODE_KEYS ): raise NotImplementedError()
[docs] def extract_frame_times( self, strategy, photodiode_cycle=60, frame_keys=Dataset.FRAME_KEYS, photodiode_keys=Dataset.PHOTODIODE_KEYS, trim_discontiguous_frame_times=True ): if strategy == 'use_photodiode': return self.extract_frame_times_from_photodiode( photodiode_cycle=photodiode_cycle, frame_keys=frame_keys, photodiode_keys=photodiode_keys, trim_discontiguous_frame_times=trim_discontiguous_frame_times ) elif strategy == 'use_vsyncs': return self.extract_frame_times_from_vsyncs( photodiode_cycle=photodiode_cycle, frame_keys=frame_keys, photodiode_keys=photodiode_keys ) else: raise ValueError('unrecognized strategy: {}'.format(strategy))
[docs] @classmethod def factory(cls, path): ''' Build a new SyncDataset. Parameters ---------- path : str Filesystem path to the h5 file containing sync information to be loaded. ''' obj = cls() obj.load(path) return obj
#