Source code for allensdk.brain_observatory.ecephys.file_io.ecephys_sync_dataset
import functools
from collections import defaultdict
import logging
import warnings
import numpy as np
from allensdk.brain_observatory.sync_dataset import Dataset
from allensdk.brain_observatory.ecephys import stimulus_sync
[docs]class EcephysSyncDataset(Dataset):
@property
def sample_frequency(self):
return self.meta_data['ni_daq']['counter_output_freq']
@sample_frequency.setter
def sample_frequency(self, value):
if not hasattr(self, 'meta_data'):
self.meta_data = defaultdict(dict)
self.meta_data['ni_daq']['counter_output_freq'] = value
def __init__(self):
'''In-memory representation of a sync h5 file as produced by the sync package.
Notes
-----
base is from http://aibspi/mpe_apps/sync/blob/master/sync/dataset.py
Construction works slightly differently for this class as its base.
In particular, this class' __init__ method merely constructs the
object. To make a new SyncDataset in client code, use the
factory classmethod. This is done for ease of testability.
'''
pass
[docs] def extract_led_times(self,
keys=Dataset.OPTOGENETIC_STIMULATION_KEYS,
fallback_line=18):
try:
led_times = self.get_edges(
kind="rising",
keys=keys,
units="seconds"
)
except KeyError:
warnings.warn("unable to find LED times using line labels" +
f"{keys}, returning line {fallback_line}")
led_times = self.get_rising_edges(fallback_line, units="seconds")
return led_times
[docs] def remove_zero_frames(self, frame_times):
D = np.diff(frame_times)
a = np.where(D < 0.01)[0]
b = np.where((D > 0.018) * (D < 0.1))[0]
def find_match(b, value):
try:
return b[np.max(np.where((b < value))[0])] - value
except ValueError:
return None
c = [find_match(b, A) for A in a]
ft = np.copy(D)
for idx, d in enumerate(a):
if c[idx] is not None:
if c[idx] > -100:
ft[d+c[idx]] = np.median(D)
ft[d] = np.median(D)
t = np.concatenate(([np.min(frame_times)],
np.cumsum(ft) + np.min(frame_times)))
return t
[docs] def extract_frame_times_from_photodiode(
self,
photodiode_cycle=60,
frame_keys=Dataset.FRAME_KEYS,
photodiode_keys=Dataset.PHOTODIODE_KEYS,
trim_discontiguous_frame_times=True):
photodiode_times = self.get_edges('all', photodiode_keys)
vsync_times = self.get_edges('falling', frame_keys)
if trim_discontiguous_frame_times:
vsync_times = stimulus_sync.trim_discontiguous_vsyncs(vsync_times)
vsync_times_chunked, pd_times_chunked = \
stimulus_sync.separate_vsyncs_and_photodiode_times(
vsync_times,
photodiode_times,
photodiode_cycle)
logging.info(f"Total chunks: {len(vsync_times_chunked)}")
frame_start_times = np.zeros((0,))
for i in range(len(vsync_times_chunked)):
photodiode_times = stimulus_sync.trim_border_pulses(
pd_times_chunked[i],
vsync_times_chunked[i])
photodiode_times = stimulus_sync.correct_on_off_effects(
photodiode_times)
photodiode_times = stimulus_sync.fix_unexpected_edges(
photodiode_times,
cycle=photodiode_cycle)
frame_duration = stimulus_sync.estimate_frame_duration(
photodiode_times,
cycle=photodiode_cycle)
irregular_interval_policy = functools.partial(
stimulus_sync.allocate_by_vsync,
np.diff(vsync_times_chunked[i]))
frame_indices, frame_starts, frame_end_times = \
stimulus_sync.compute_frame_times(
photodiode_times,
frame_duration,
len(vsync_times_chunked[i]),
cycle=photodiode_cycle,
irregular_interval_policy=irregular_interval_policy
)
frame_start_times = np.concatenate((frame_start_times,
frame_starts))
frame_start_times = self.remove_zero_frames(frame_start_times)
logging.info(f"Total vsyncs: {len(vsync_times)}")
return frame_start_times
[docs] def extract_frame_times_from_vsyncs(
self,
photodiode_cycle=60,
frame_keys=Dataset.FRAME_KEYS, photodiode_keys=Dataset.PHOTODIODE_KEYS
):
raise NotImplementedError()
[docs] def extract_frame_times(
self,
strategy,
photodiode_cycle=60,
frame_keys=Dataset.FRAME_KEYS,
photodiode_keys=Dataset.PHOTODIODE_KEYS,
trim_discontiguous_frame_times=True
):
if strategy == 'use_photodiode':
return self.extract_frame_times_from_photodiode(
photodiode_cycle=photodiode_cycle,
frame_keys=frame_keys,
photodiode_keys=photodiode_keys,
trim_discontiguous_frame_times=trim_discontiguous_frame_times
)
elif strategy == 'use_vsyncs':
return self.extract_frame_times_from_vsyncs(
photodiode_cycle=photodiode_cycle,
frame_keys=frame_keys,
photodiode_keys=photodiode_keys
)
else:
raise ValueError('unrecognized strategy: {}'.format(strategy))
[docs] @classmethod
def factory(cls, path):
''' Build a new SyncDataset.
Parameters
----------
path : str
Filesystem path to the h5 file containing sync information
to be loaded.
'''
obj = cls()
obj.load(path)
return obj
#