Source code for allensdk.brain_observatory.ecephys.align_timestamps.barcode

import numpy as np


[docs]def extract_barcodes_from_times( on_times, off_times, inter_barcode_interval=10, bar_duration=0.03, barcode_duration_ceiling=2, nbits=32, ): """Read barcodes from timestamped rising and falling edges. Parameters ---------- on_times : numpy.ndarray Timestamps of rising edges on the barcode line off_times : numpy.ndarray Timestamps of falling edges on the barcode line inter_barcode_interval : numeric, optional Minimun duration of time between barcodes. bar_duration : numeric, optional A value slightly shorter than the expected duration of each bar barcode_duration_ceiling : numeric, optional The maximum duration of a single barcode nbits : int, optional The bit-depth of each barcode Returns ------- barcode_start_times : list of numeric For each detected barcode, the time at which that barcode started barcodes : list of int For each detected barcode, the value of that barcode as an integer. Notes ----- ignores first code in prod (ok, but not intended) ignores first on pulse (intended - this is needed to identify that a barcode is starting) """ start_indices = np.diff(on_times) a = np.where(start_indices > inter_barcode_interval)[0] barcode_start_times = on_times[a + 1] barcodes = [] for i, t in enumerate(barcode_start_times): oncode = on_times[ np.where( np.logical_and(on_times > t, on_times < t + barcode_duration_ceiling) )[0] ] offcode = off_times[ np.where( np.logical_and(off_times > t, off_times < t + barcode_duration_ceiling) )[0] ] currTime = offcode[0] bits = np.zeros((nbits,)) for bit in range(0, nbits): nextOn = np.where(oncode > currTime)[0] nextOff = np.where(offcode > currTime)[0] if nextOn.size > 0: nextOn = oncode[nextOn[0]] else: nextOn = t + inter_barcode_interval if nextOff.size > 0: nextOff = offcode[nextOff[0]] else: nextOff = t + inter_barcode_interval if nextOn < nextOff: bits[bit] = 1 currTime += bar_duration barcode = 0 # least sig left for bit in range(0, nbits): barcode += bits[bit] * pow(2, bit) barcodes.append(barcode) return barcode_start_times, barcodes
[docs]def find_matching_index(master_barcodes, probe_barcodes, alignment_type="start"): """Given a set of barcodes for the master clock and the probe clock, find the indices of a matching set, either starting from the beginning or the end of the list. Parameters ---------- master_barcodes : np.ndarray barcode values on the master line. One per barcode probe_barcodes : np.ndarray barcode values on the probe line. One per barcode alignment_type : string 'start' or 'end' Returns ------- master_barcode_index : int matching index for master barcodes (None if not found) probe_barcode_index : int matching index for probe barcodes (None if not found) """ foundMatch = False master_barcode_index = None if alignment_type == "start": probe_barcode_index = 0 direction = 1 else: probe_barcode_index = -1 direction = -1 while not foundMatch and abs(probe_barcode_index) < len(probe_barcodes): master_barcode_index = np.where( master_barcodes == probe_barcodes[probe_barcode_index] )[0] assert len(master_barcode_index) < 2 if len(master_barcode_index) == 1: foundMatch = True else: probe_barcode_index += direction if foundMatch: return master_barcode_index, probe_barcode_index else: return None, None
[docs]def match_barcodes(master_times, master_barcodes, probe_times, probe_barcodes): """Given sequences of barcode values and (local) times on a probe line and a master line, find the time points on each clock corresponding to the first and last shared barcode. If there's only one probe barcode, only the first matching timepoint is returned. Parameters ---------- master_times : np.ndarray start times of barcodes (according to the master clock) on the master line. One per barcode. master_barcodes : np.ndarray barcode values on the master line. One per barcode probe_times : np.ndarray start times (according to the probe clock) of barcodes on the probe line. One per barcode probe_barcodes : np.ndarray barcode values on the probe_line. One per barcode Returns ------- probe_interval : np.ndarray Start and end times of the matched interval according to the probe_clock. master_interval : np.ndarray Start and end times of the matched interval according to the master clock """ master_start_index, probe_start_index = find_matching_index( master_barcodes, probe_barcodes, alignment_type="start" ) if master_start_index is not None: t_m_start = master_times[master_start_index] t_p_start = probe_times[probe_start_index] else: t_m_start, t_p_start = None, None # print(master_barcodes) # print(probe_barcodes) print("Master start index: " + str(master_start_index)) if len(probe_barcodes) > 2: master_end_index, probe_end_index = find_matching_index(master_barcodes, probe_barcodes, alignment_type='end') if probe_end_index is not None: print("Probe end index: " + str(probe_end_index)) t_m_end = master_times[master_end_index] t_p_end = probe_times[probe_end_index] else: t_m_end = None t_p_end = None else: t_m_end, t_p_end = None, None return np.array([t_p_start, t_p_end]), np.array([t_m_start, t_m_end])
[docs]def linear_transform_from_intervals(master, probe): """Find a scale and translation which aligns two 1d segments Parameters ---------- master : iterable Pair of floats defining the master interval. Order is [start, end]. probe : iterable Pair of floats defining the probe interval. Order is [start, end]. Returns ------- scale : float Scale factor. If > 1.0, the probe clock is running fast compared to the master clock. If < 1.0, the probe clock is running slow. translation : float If > 0, the probe clock started before the master clock. If > 0, after. Notes ----- solves (master + translation) * scale = probe for scale and translation """ if probe[1] is not None: scale = (probe[1] - probe[0]) / (master[1] - master[0]) else: scale = 1.0 if master[0] is not None: translation = probe[0] / scale - master[0] else: translation = None return scale, translation
[docs]def get_probe_time_offset( master_times, master_barcodes, probe_times, probe_barcodes, acq_start_index, local_probe_rate, ): """Time offset between master clock and recording probes. For converting probe time to master clock. Parameters ---------- master_times : np.ndarray start times of barcodes (according to the master clock) on the master line. One per barcode. master_barcodes : np.ndarray barcode values on the master line. One per barcode probe_times : np.ndarray start times (according to the probe clock) of barcodes on the probe line. One per barcode probe_barcodes : np.ndarray barcode values on the probe_line. One per barcode acq_start_index : int sample index of probe acquisition start time local_probe_rate : float the probe's apparent sampling rate Returns ------- total_time_shift : float Time at which the probe started acquisition, assessed on the master clock. If < 0, the probe started earlier than the master line. probe_rate : float The probe's sampling rate, assessed on the master clock master_endpoints : iterable Defines the start and end times of the sync interval on the master clock """ probe_endpoints, master_endpoints = match_barcodes( master_times, master_barcodes, probe_times, probe_barcodes ) rate_scale, time_offset = linear_transform_from_intervals( master_endpoints, probe_endpoints ) if time_offset is not None: probe_rate = local_probe_rate * rate_scale acq_start_time = acq_start_index / probe_rate total_time_shift = time_offset - acq_start_time else: print("Not enough barcodes...setting sampling rate to 0") total_time_shift = 0 probe_rate = 0 return total_time_shift, probe_rate, master_endpoints