Source code for allensdk.brain_observatory.ecephys.stimulus_sync

import numpy as np
import scipy.spatial.distance as distance


[docs]def trimmed_stats(data, pctiles=(10, 90)): low = np.percentile(data, pctiles[0]) high = np.percentile(data, pctiles[1]) trimmed = data[np.logical_and( data <= high, data >= low )] return np.mean(trimmed), np.std(trimmed)
[docs]def trim_discontiguous_vsyncs(vs_times, photodiode_cycle=60): vs_times = np.array(vs_times) breaks = np.where(np.diff(vs_times) > (1/photodiode_cycle)*100)[0] if len(breaks) > 0: chunk_sizes = np.diff(np.concatenate((np.array([0, ]), breaks, np.array([len(vs_times), ])))) largest_chunk = np.argmax(chunk_sizes) if largest_chunk == 0: return vs_times[:np.min(breaks+1)] elif largest_chunk == len(breaks): return vs_times[np.max(breaks+1):] else: return vs_times[breaks[largest_chunk-1]:breaks[largest_chunk]] else: return vs_times
[docs]def separate_vsyncs_and_photodiode_times(vs_times, pd_times, photodiode_cycle=60): vs_times = np.array(vs_times) pd_times = np.array(pd_times) breaks = np.where(np.diff(vs_times) > (1/photodiode_cycle)*100)[0] shift = 2.0 break_times = [-shift] break_times.extend(vs_times[breaks].tolist()) break_times.extend([np.inf]) vs_times_out = [] pd_times_out = [] for indx, b in enumerate(break_times[:-1]): pd_in_range = np.where((pd_times > break_times[indx] + shift) * (pd_times <= break_times[indx+1] + shift))[0] vs_in_range = np.where((vs_times > break_times[indx]) * (vs_times <= break_times[indx+1]))[0] vs_times_out.append(vs_times[vs_in_range]) pd_times_out.append(pd_times[pd_in_range]) return vs_times_out, pd_times_out
[docs]def trim_border_pulses(pd_times, vs_times, frame_interval=1/60, num_frames=5): pd_times = np.array(pd_times) return pd_times[np.logical_and( pd_times >= vs_times[0], pd_times <= vs_times[-1] + num_frames * frame_interval )]
[docs]def correct_on_off_effects(pd_times): ''' Notes ----- This cannot (without additional info) determine whether an assymmetric offset is odd-long or even-long. ''' pd_diff = np.diff(pd_times) odd_diff_mean, odd_diff_std = trimmed_stats(pd_diff[1::2]) even_diff_mean, even_diff_std = trimmed_stats(pd_diff[0::2]) half_diff = np.diff(pd_times[0::2]) full_period_mean, full_period_std = trimmed_stats(half_diff) half_period_mean = full_period_mean / 2 odd_offset = odd_diff_mean - half_period_mean even_offset = even_diff_mean - half_period_mean pd_times[::2] -= odd_offset / 2 pd_times[1::2] -= even_offset / 2 return pd_times
[docs]def flag_unexpected_edges(pd_times, ndevs=10): pd_diff = np.diff(pd_times) diff_mean, diff_std = trimmed_stats(pd_diff) expected_duration_mask = np.ones(pd_diff.size) expected_duration_mask[np.logical_or( pd_diff < diff_mean - ndevs * diff_std, pd_diff > diff_mean + ndevs * diff_std )] = 0 expected_duration_mask[1:] = np.logical_and(expected_duration_mask[:-1], expected_duration_mask[1:]) expected_duration_mask = np.concatenate([expected_duration_mask, [expected_duration_mask[-1]]]) return expected_duration_mask
[docs]def fix_unexpected_edges(pd_times, ndevs=10, cycle=60, max_frame_offset=4): pd_times = np.array(pd_times) expected_duration_mask = flag_unexpected_edges(pd_times, ndevs=ndevs) diff_mean, diff_std = trimmed_stats(np.diff(pd_times)) frame_interval = diff_mean / cycle bad_edges = np.where(expected_duration_mask == 0)[0] bad_blocks = np.sort(np.unique(np.concatenate([ [0], np.where(np.diff(bad_edges) > 1)[0] + 1, [len(bad_edges)] ]))) output_edges = [] for low, high in zip(bad_blocks[:-1], bad_blocks[1:]): current_bad_edge_indices = bad_edges[low: high-1] current_bad_edges = pd_times[current_bad_edge_indices] low_bound = pd_times[current_bad_edge_indices[0]] high_bound = pd_times[current_bad_edge_indices[-1] + 1] edges_missing = int(np.around((high_bound - low_bound) / diff_mean)) expected = np.linspace(low_bound, high_bound, edges_missing + 1) distances = distance.cdist(current_bad_edges[:, None], expected[:, None]) distances = np.around(distances / frame_interval).astype(int) min_offsets = np.amin(distances, axis=0) min_offset_indices = np.argmin(distances, axis=0) output_edges = np.concatenate([ output_edges, expected[min_offsets > max_frame_offset], current_bad_edges[min_offset_indices[min_offsets <= max_frame_offset]] ]) return np.sort(np.concatenate([output_edges, pd_times[expected_duration_mask > 0]]))
[docs]def estimate_frame_duration(pd_times, cycle=60): return trimmed_stats(np.diff(pd_times))[0] / cycle
[docs]def assign_to_last(index, starts, ends, frame_duration, irregularity, cycle): ends[-1] += frame_duration * np.sign(irregularity) return starts, ends
[docs]def allocate_by_vsync(vs_diff, index, starts, ends, frame_duration, irregularity, cycle): current_vs_diff = vs_diff[index * cycle: (index + 1) * cycle] sign = np.sign(irregularity) if sign > 0: vs_ind = np.argmax(current_vs_diff) elif sign < 0: vs_ind = np.argmin(current_vs_diff) ends[vs_ind:] += sign * frame_duration starts[vs_ind + 1:] += sign * frame_duration return starts, ends
[docs]def compute_frame_times(photodiode_times, frame_duration, num_frames, cycle, irregular_interval_policy=assign_to_last): indices = np.arange(num_frames) starts = np.zeros(num_frames, dtype=float) ends = np.zeros(num_frames, dtype=float) num_intervals = len(photodiode_times) - 1 for start_index, (start_time, end_time) in \ enumerate(zip(photodiode_times[:-1], photodiode_times[1:])): interval_duration = end_time - start_time irregularity = \ int(np.around((interval_duration) / frame_duration)) - cycle local_frame_duration = interval_duration / (cycle + irregularity) durations = \ np.zeros(cycle + (start_index == num_intervals - 1)) + local_frame_duration current_ends = np.cumsum(durations) + start_time current_starts = current_ends - durations while irregularity != 0: current_starts, current_ends = irregular_interval_policy( start_index, current_starts, current_ends, local_frame_duration, irregularity, cycle ) irregularity += -1 * np.sign(irregularity) early_frame = start_index * cycle late_frame = \ (start_index + 1) * cycle + (start_index == num_intervals - 1) remaining = starts[early_frame: late_frame].size starts[early_frame: late_frame] = current_starts[:remaining] ends[early_frame: late_frame] = current_ends[:remaining] return indices, starts, ends
# #