Source code for allensdk.brain_observatory.behavior.sync.process_sync

import numpy as np

import logging
logger = logging.getLogger(__name__)

[docs]def filter_digital(rising, falling, threshold=0.0001): """ Removes short transients from digital signal. Rising and falling should be same length and units in seconds. Kwargs: threshold (float): transient width """ # forwards (removes low-to-high transients) dif_f = falling - rising falling_f = falling[np.abs(dif_f) > threshold] rising_f = rising[np.abs(dif_f) > threshold] # backwards (removes high-to-low transients ) dif_b = rising_f[1:] - falling_f[:-1] dif_br = np.append([threshold * 2], dif_b) dif_bf = np.append(dif_b, [threshold * 2]) rising_f = rising_f[np.abs(dif_br) > threshold] falling_f = falling_f[np.abs(dif_bf) > threshold] return rising_f, falling_f
[docs]def calculate_delay(sync_data, stim_vsync_fall, sample_frequency): # from ASSUMED_DELAY = 0.0351 DELAY_THRESHOLD = 0.001 FIRST_ELEMENT_INDEX = 0 ROUND_PRECISION = 4 ONE = 1'calculating monitor delay') # try: # photodiode transitions photodiode_rise = sync_data.get_rising_edges('stim_photodiode') / sample_frequency # Find start and stop of stimulus # test and correct for photodiode transition errors photodiode_rise_diff = np.ediff1d(photodiode_rise) min_short_photodiode_rise = 0.1 max_short_photodiode_rise = 0.3 min_medium_photodiode_rise = 0.5 max_medium_photodiode_rise = 1.5 # find the short and medium length photodiode rises short_rise_indexes = np.where(np.logical_and(photodiode_rise_diff > min_short_photodiode_rise, photodiode_rise_diff < max_short_photodiode_rise))[ FIRST_ELEMENT_INDEX] medium_rise_indexes = np.where(np.logical_and(photodiode_rise_diff > min_medium_photodiode_rise, photodiode_rise_diff < max_medium_photodiode_rise))[ FIRST_ELEMENT_INDEX] short_set = set(short_rise_indexes) # iterate through the medium photodiode rise indexes to find the start and stop indexes # lookng for three rise pattern next_frame = ONE start_pattern_index = 2 end_pattern_index = 3 ptd_start = None ptd_end = None for medium_rise_index in medium_rise_indexes: if set(range(medium_rise_index - start_pattern_index, medium_rise_index)) <= short_set: ptd_start = medium_rise_index + next_frame elif set(range(medium_rise_index + next_frame, medium_rise_index + end_pattern_index)) <= short_set: ptd_end = medium_rise_index # if the photodiode signal exists if ptd_start is not None and ptd_end is not None: # check to make sure there are no there are no photodiode errors # sometimes two consecutive photodiode events take place close to each other # correct this case if it happens photodiode_rise_error_threshold = 1.8 last_frame_index = -1 # iterate until all of the errors have been corrected while any(photodiode_rise_diff[ptd_start:ptd_end] < photodiode_rise_error_threshold): error_frames = np.where(photodiode_rise_diff[ptd_start:ptd_end] < photodiode_rise_error_threshold)[ FIRST_ELEMENT_INDEX] + ptd_start # remove the bad photodiode event photodiode_rise = np.delete(photodiode_rise, error_frames[last_frame_index]) ptd_end -= 1 photodiode_rise_diff = np.ediff1d(photodiode_rise) # Find the delay # calculate monitor delay first_pulse = ptd_start number_of_photodiode_rises = ptd_end - ptd_start half_vsync_fall_events_per_photodiode_rise = 60 vsync_fall_events_per_photodiode_rise = half_vsync_fall_events_per_photodiode_rise * 2 delay_rise = np.empty(number_of_photodiode_rises) for photodiode_rise_index in range(number_of_photodiode_rises): delay_rise[photodiode_rise_index] = photodiode_rise[photodiode_rise_index + first_pulse] - \ stim_vsync_fall[(photodiode_rise_index * vsync_fall_events_per_photodiode_rise) + half_vsync_fall_events_per_photodiode_rise] # get a single delay value by finding the mean of all of the delays - skip the last element in the array (the end of the experimenet) delay = np.mean(delay_rise[:last_frame_index]) delay_std = np.std(delay_rise[:last_frame_index]) if (delay_std > DELAY_THRESHOLD or np.isnan(delay)): logger.error("Sync photodiode error needs to be fixed. Using assumed monitor delay: {}".format(round(delay, ROUND_PRECISION))) raise # assume delay else: raise # delay = ASSUMED_DELAY # except Exception as e: # # delay = ASSUMED_DELAY # logger.error("Process without photodiode signal. Assumed delay: {}".format(round(delay, ROUND_PRECISION))) return delay