Source code for allensdk.brain_observatory.ecephys.align_timestamps.probe_synchronizer

from . import barcode

import numpy as np


[docs]class ProbeSynchronizer(object): @property def sampling_rate_scale(self): """ The ratio of the probe's sampling rate assessed on the global clock to the probe's locally assessed sampling rate. """ return self.global_probe_sampling_rate / self.local_probe_sampling_rate def __init__( self, global_probe_sampling_rate, local_probe_sampling_rate, total_time_shift, min_time, max_time, ): """Converts probe sample indices to master clock times. Parameters ---------- global_probe_sampling_rate : float The sampling rate of the probe (Hz) assessed on the master clock. local_probe_sampling_rate : float The sampling rate of the probe (Hz) assessed on the probe clock. total_time_shift : float Offset (s) from probe to master times. min_time : float minimum time for this synchronizer max_time : float maximum time for this synchronizer """ self.global_probe_sampling_rate = global_probe_sampling_rate self.local_probe_sampling_rate = local_probe_sampling_rate self.total_time_shift = total_time_shift self.min_time = min_time self.max_time = max_time def __call__(self, samples, sync_condition="master"): """Applies a computed transform to input probe sample indices. Parameters ---------- samples : numpy.ndarray Array of timestamps in probe samples. sync_condition : str, optional How to synchronize the timestamps. Available options are: 'master': Default, synchronize to master clock 'probe': adjust probe samples -> probe times Returns ------- numpy.ndarray : Sample timestamps in seconds on the master (default) or probe clock. """ in_range = np.where( ((samples / self.local_probe_sampling_rate) >= self.min_time) * ((samples / self.local_probe_sampling_rate) < self.max_time) )[0] if self.global_probe_sampling_rate > 0: if sync_condition == "probe": samples[in_range] = samples[in_range] / \ self.local_probe_sampling_rate elif sync_condition == "master": samples[in_range] = ( samples[in_range] / self.global_probe_sampling_rate - self.total_time_shift ) else: raise ValueError( "unrecognized sync condition: {}".format(sync_condition) ) else: samples[in_range] = -1 return samples
[docs] @classmethod def compute( cls, master_barcode_times, master_barcodes, probe_barcode_times, probe_barcodes, min_time, max_time, probe_start_index, local_probe_sampling_rate, ): """Compute a transform from probe samples to master times by aligning barcodes. Parameters ---------- master_barcode_times : np.ndarray start times of barcodes (according to the master clock) on the master line. One per barcode. master_barcodes : np.ndarray barcode values on the master line. One per barcode probe_barcode_times : np.ndarray start times (according to the probe clock) of barcodes on the probe line. One per barcode probe_barcodes : np.ndarray barcode values on the probe_line. One per barcode min_time : Float time (in seconds) of first barcode to align max_time : Float time (in seconds) of last barcode to align probe_start_index : int sample index of probe acquisition start time local_probe_sampling_rate : float the probe's apparent sampling rate Returns ------- ProbeSynchronizer : When called, applies the transform computed here to samples on the probe clock. """ times_array = np.array(probe_barcode_times) barcodes_array = np.array(probe_barcodes) ok_barcodes = np.where((times_array > min_time) * (times_array < max_time))[0] ok_barcodes = ok_barcodes[ok_barcodes < len(barcodes_array)] times_to_align = list(times_array[ok_barcodes]) barcodes_to_align = list(barcodes_array[ok_barcodes]) if len(barcodes_to_align) > 0: print("Num barcodes: " + str(len(barcodes_to_align))) total_time_shift, global_probe_sampling_rate, _ = \ barcode.get_probe_time_offset( master_barcode_times, master_barcodes, times_to_align, barcodes_to_align, probe_start_index, local_probe_sampling_rate, ) else: print("Not enough barcodes...setting sampling rate to 0") total_time_shift = 0 global_probe_sampling_rate = 0 return cls( global_probe_sampling_rate, local_probe_sampling_rate, total_time_shift, min_time, max_time, )