Source code for allensdk.brain_observatory.ecephys.align_timestamps.probe_synchronizer
from . import barcode
import numpy as np
[docs]class ProbeSynchronizer(object):
@property
def sampling_rate_scale(self):
""" The ratio of the probe's sampling rate assessed on the global clock to the
probe's locally assessed sampling rate.
"""
return self.global_probe_sampling_rate / self.local_probe_sampling_rate
def __init__(
self,
global_probe_sampling_rate,
local_probe_sampling_rate,
total_time_shift,
min_time,
max_time,
):
"""Converts probe sample indices to master clock times.
Parameters
----------
global_probe_sampling_rate : float
The sampling rate of the probe (Hz) assessed on the master clock.
local_probe_sampling_rate : float
The sampling rate of the probe (Hz) assessed on the probe clock.
total_time_shift : float
Offset (s) from probe to master times.
min_time : float
minimum time for this synchronizer
max_time : float
maximum time for this synchronizer
"""
self.global_probe_sampling_rate = global_probe_sampling_rate
self.local_probe_sampling_rate = local_probe_sampling_rate
self.total_time_shift = total_time_shift
self.min_time = min_time
self.max_time = max_time
def __call__(self, samples, sync_condition="master"):
"""Applies a computed transform to input probe sample indices.
Parameters
----------
samples : numpy.ndarray
Array of timestamps in probe samples.
sync_condition : str, optional
How to synchronize the timestamps. Available options are:
'master': Default, synchronize to master clock
'probe': adjust probe samples -> probe times
Returns
-------
numpy.ndarray :
Sample timestamps in seconds on the master (default) or probe clock.
"""
in_range = np.where(
((samples / self.local_probe_sampling_rate) >= self.min_time)
* ((samples / self.local_probe_sampling_rate) < self.max_time)
)[0]
if self.global_probe_sampling_rate > 0:
if sync_condition == "probe":
samples[in_range] = samples[in_range] / self.local_probe_sampling_rate
elif sync_condition == "master":
samples[in_range] = (
samples[in_range] / self.global_probe_sampling_rate
- self.total_time_shift
)
else:
raise ValueError(
"unrecognized sync condition: {}".format(sync_condition)
)
else:
samples[in_range] = -1
return samples
[docs] @classmethod
def compute(
cls,
master_barcode_times,
master_barcodes,
probe_barcode_times,
probe_barcodes,
min_time,
max_time,
probe_start_index,
local_probe_sampling_rate,
):
"""Compute a transform from probe samples to master times by aligning barcodes.
Parameters
----------
master_barcode_times : np.ndarray
start times of barcodes (according to the master clock) on the master line.
One per barcode.
master_barcodes : np.ndarray
barcode values on the master line. One per barcode
probe_barcode_times : np.ndarray
start times (according to the probe clock) of barcodes on the probe line.
One per barcode
probe_barcodes : np.ndarray
barcode values on the probe_line. One per barcode
min_time : Float
time (in seconds) of first barcode to align
max_time : Float
time (in seconds) of last barcode to align
probe_start_index : int
sample index of probe acquisition start time
local_probe_sampling_rate : float
the probe's apparent sampling rate
Returns
-------
ProbeSynchronizer :
When called, applies the transform computed here to samples on the probe clock.
"""
times_array = np.array(probe_barcode_times)
barcodes_array = np.array(probe_barcodes)
ok_barcodes = np.where((times_array > min_time) * (times_array < max_time))[0]
times_to_align = list(times_array[ok_barcodes])
barcodes_to_align = list(barcodes_array[ok_barcodes])
if len(barcodes_to_align) > 0:
print("Num barcodes: " + str(len(barcodes_to_align)))
total_time_shift, global_probe_sampling_rate, _ = barcode.get_probe_time_offset(
master_barcode_times,
master_barcodes,
times_to_align,
barcodes_to_align,
probe_start_index,
local_probe_sampling_rate,
)
else:
print("Not enough barcodes...setting sampling rate to 0")
total_time_shift = 0
global_probe_sampling_rate = 0
return cls(
global_probe_sampling_rate,
local_probe_sampling_rate,
total_time_shift,
min_time,
max_time,
)