Source code for allensdk.brain_observatory.ecephys.stimulus_sync

import warnings

import numpy as np
import scipy.spatial.distance as distance


[docs]def trimmed_stats(data, pctiles=(10, 90)): low = np.percentile(data, pctiles[0]) high = np.percentile(data, pctiles[1]) trimmed = data[np.logical_and( data <= high, data >= low )] return np.mean(trimmed), np.std(trimmed)
[docs]def trim_border_pulses(pd_times, vs_times, frame_interval=1/60, num_frames=5): pd_times = np.array(pd_times) return pd_times[np.logical_and( pd_times >= vs_times[0], pd_times <= vs_times[-1] + num_frames * frame_interval )]
[docs]def correct_on_off_effects(pd_times): ''' Notes ----- This cannot (without additional info) determine whether an assymmetric offset is odd-long or even-long. ''' pd_diff = np.diff(pd_times) odd_diff_mean, odd_diff_std = trimmed_stats(pd_diff[1::2]) even_diff_mean, even_diff_std = trimmed_stats(pd_diff[0::2]) half_diff = np.diff(pd_times[0::2]) full_period_mean, full_period_std = trimmed_stats(half_diff) half_period_mean = full_period_mean / 2 odd_offset = odd_diff_mean - half_period_mean even_offset = even_diff_mean - half_period_mean pd_times[::2] -= odd_offset / 2 pd_times[1::2] -= even_offset / 2 return pd_times
[docs]def flag_unexpected_edges(pd_times, ndevs=10): pd_diff = np.diff(pd_times) diff_mean, diff_std = trimmed_stats(pd_diff) expected_duration_mask = np.ones(pd_diff.size) expected_duration_mask[np.logical_or( pd_diff < diff_mean - ndevs * diff_std, pd_diff > diff_mean + ndevs * diff_std )] = 0 expected_duration_mask[1:] = np.logical_and(expected_duration_mask[:-1], expected_duration_mask[1:]) expected_duration_mask = np.concatenate([expected_duration_mask, [expected_duration_mask[-1]]]) return expected_duration_mask
[docs]def fix_unexpected_edges(pd_times, ndevs=10, cycle=60, max_frame_offset=4): pd_times = np.array(pd_times) expected_duration_mask = flag_unexpected_edges(pd_times, ndevs=ndevs) diff_mean, diff_std = trimmed_stats(np.diff(pd_times)) frame_interval = diff_mean / cycle bad_edges = np.where(expected_duration_mask == 0)[0] bad_blocks = np.sort(np.unique(np.concatenate([ [0], np.where(np.diff(bad_edges) > 1)[0] + 1, [len(bad_edges)] ]))) output_edges = [] for low, high in zip(bad_blocks[:-1], bad_blocks[1:]): current_bad_edge_indices = bad_edges[low: high-1] current_bad_edges = pd_times[current_bad_edge_indices] low_bound = pd_times[current_bad_edge_indices[0]] high_bound = pd_times[current_bad_edge_indices[-1] + 1] edges_missing = int(np.around((high_bound - low_bound) / diff_mean)) expected = np.linspace(low_bound, high_bound, edges_missing + 1) distances = distance.cdist(current_bad_edges[:, None], expected[:, None]) distances = np.around(distances / frame_interval).astype(int) min_offsets = np.amin(distances, axis=0) min_offset_indices = np.argmin(distances, axis=0) output_edges = np.concatenate([ output_edges, expected[min_offsets > max_frame_offset], current_bad_edges[min_offset_indices[min_offsets <= max_frame_offset]] ]) return np.sort(np.concatenate([output_edges, pd_times[expected_duration_mask > 0]]))
[docs]def estimate_frame_duration(pd_times, cycle=60): return trimmed_stats(np.diff(pd_times))[0] / cycle
[docs]def assign_to_last(index, starts, ends, frame_duration, irregularity, cycle): ends[-1] += frame_duration * np.sign(irregularity) return starts, ends
[docs]def allocate_by_vsync(vs_diff, index, starts, ends, frame_duration, irregularity, cycle): current_vs_diff = vs_diff[index * cycle: (index + 1) * cycle] sign = np.sign(irregularity) if sign > 0: vs_ind = np.argmax(current_vs_diff) elif sign < 0: vs_ind = np.argmin(current_vs_diff) ends[vs_ind:] += sign * frame_duration starts[vs_ind + 1:] += sign * frame_duration return starts, ends
[docs]def compute_frame_times(photodiode_times, frame_duration, num_frames, cycle, irregular_interval_policy=assign_to_last): indices = np.arange(num_frames) starts = np.zeros(num_frames, dtype=float) ends = np.zeros(num_frames, dtype=float) num_intervals = len(photodiode_times) - 1 for start_index, (start_time, end_time) in enumerate(zip(photodiode_times[:-1], photodiode_times[1:])): interval_duration = end_time - start_time irregularity = int(np.around((interval_duration) / frame_duration)) - cycle local_frame_duration = interval_duration / (cycle + irregularity) durations = np.zeros(cycle + ( start_index == num_intervals - 1 )) + local_frame_duration current_ends = np.cumsum(durations) + start_time current_starts = current_ends - durations while irregularity != 0: current_starts, current_ends = irregular_interval_policy( start_index, current_starts, current_ends, local_frame_duration, irregularity, cycle ) irregularity += -1 * np.sign(irregularity) early_frame = start_index * cycle late_frame = (start_index + 1) * cycle + ( start_index == num_intervals - 1 ) remaining = starts[early_frame: late_frame].size starts[early_frame: late_frame] = current_starts[:remaining] ends[early_frame: late_frame] = current_ends[:remaining] return indices, starts, ends