Source code for allensdk.brain_observatory.behavior.sync.process_sync
import numpy as np
import logging
logger = logging.getLogger(__name__)
[docs]def filter_digital(rising, falling, threshold=0.0001):
"""
Removes short transients from digital signal.
Rising and falling should be same length and units
in seconds.
Kwargs:
threshold (float): transient width
"""
# forwards (removes low-to-high transients)
dif_f = falling - rising
falling_f = falling[np.abs(dif_f) > threshold]
rising_f = rising[np.abs(dif_f) > threshold]
# backwards (removes high-to-low transients )
dif_b = rising_f[1:] - falling_f[:-1]
dif_br = np.append([threshold * 2], dif_b)
dif_bf = np.append(dif_b, [threshold * 2])
rising_f = rising_f[np.abs(dif_br) > threshold]
falling_f = falling_f[np.abs(dif_bf) > threshold]
return rising_f, falling_f
[docs]def calculate_delay(sync_data, stim_vsync_fall, sample_frequency):
# from http://stash.corp.alleninstitute.org/projects/INF/repos/lims2_modules/browse/CAM/ophys_time_sync/ophys_time_sync.py
ASSUMED_DELAY = 0.0351
DELAY_THRESHOLD = 0.001
FIRST_ELEMENT_INDEX = 0
ROUND_PRECISION = 4
ONE = 1
logger.info('calculating monitor delay')
# try:
# photodiode transitions
photodiode_rise = sync_data.get_rising_edges('stim_photodiode') / sample_frequency
# Find start and stop of stimulus
# test and correct for photodiode transition errors
photodiode_rise_diff = np.ediff1d(photodiode_rise)
min_short_photodiode_rise = 0.1
max_short_photodiode_rise = 0.3
min_medium_photodiode_rise = 0.5
max_medium_photodiode_rise = 1.5
# find the short and medium length photodiode rises
short_rise_indexes = np.where(np.logical_and(photodiode_rise_diff > min_short_photodiode_rise,
photodiode_rise_diff < max_short_photodiode_rise))[
FIRST_ELEMENT_INDEX]
medium_rise_indexes = np.where(np.logical_and(photodiode_rise_diff > min_medium_photodiode_rise,
photodiode_rise_diff < max_medium_photodiode_rise))[
FIRST_ELEMENT_INDEX]
short_set = set(short_rise_indexes)
# iterate through the medium photodiode rise indexes to find the start and stop indexes
# lookng for three rise pattern
next_frame = ONE
start_pattern_index = 2
end_pattern_index = 3
ptd_start = None
ptd_end = None
for medium_rise_index in medium_rise_indexes:
if set(range(medium_rise_index - start_pattern_index, medium_rise_index)) <= short_set:
ptd_start = medium_rise_index + next_frame
elif set(range(medium_rise_index + next_frame, medium_rise_index + end_pattern_index)) <= short_set:
ptd_end = medium_rise_index
# if the photodiode signal exists
if ptd_start is not None and ptd_end is not None:
# check to make sure there are no there are no photodiode errors
# sometimes two consecutive photodiode events take place close to each other
# correct this case if it happens
photodiode_rise_error_threshold = 1.8
last_frame_index = -1
# iterate until all of the errors have been corrected
while any(photodiode_rise_diff[ptd_start:ptd_end] < photodiode_rise_error_threshold):
error_frames = np.where(photodiode_rise_diff[ptd_start:ptd_end] < photodiode_rise_error_threshold)[
FIRST_ELEMENT_INDEX] + ptd_start
# remove the bad photodiode event
photodiode_rise = np.delete(photodiode_rise, error_frames[last_frame_index])
ptd_end -= 1
photodiode_rise_diff = np.ediff1d(photodiode_rise)
# Find the delay
# calculate monitor delay
first_pulse = ptd_start
number_of_photodiode_rises = ptd_end - ptd_start
half_vsync_fall_events_per_photodiode_rise = 60
vsync_fall_events_per_photodiode_rise = half_vsync_fall_events_per_photodiode_rise * 2
delay_rise = np.empty(number_of_photodiode_rises)
for photodiode_rise_index in range(number_of_photodiode_rises):
delay_rise[photodiode_rise_index] = photodiode_rise[photodiode_rise_index + first_pulse] - \
stim_vsync_fall[(photodiode_rise_index * vsync_fall_events_per_photodiode_rise) + half_vsync_fall_events_per_photodiode_rise]
# get a single delay value by finding the mean of all of the delays - skip the last element in the array (the end of the experimenet)
delay = np.mean(delay_rise[:last_frame_index])
delay_std = np.std(delay_rise[:last_frame_index])
if (delay_std > DELAY_THRESHOLD or np.isnan(delay)):
logger.error("Sync photodiode error needs to be fixed. Using assumed monitor delay: {}".format(round(delay, ROUND_PRECISION)))
raise
# assume delay
else:
raise
# delay = ASSUMED_DELAY
# except Exception as e:
# logger.info(e)
# delay = ASSUMED_DELAY
# logger.error("Process without photodiode signal. Assumed delay: {}".format(round(delay, ROUND_PRECISION)))
return delay