import warnings
import numpy as np
import scipy.spatial.distance as distance
[docs]def trimmed_stats(data, pctiles=(10, 90)):
low = np.percentile(data, pctiles[0])
high = np.percentile(data, pctiles[1])
trimmed = data[np.logical_and(
data <= high,
data >= low
)]
return np.mean(trimmed), np.std(trimmed)
[docs]def trim_border_pulses(pd_times, vs_times, frame_interval=1/60, num_frames=5):
pd_times = np.array(pd_times)
return pd_times[np.logical_and(
pd_times >= vs_times[0],
pd_times <= vs_times[-1] + num_frames * frame_interval
)]
[docs]def correct_on_off_effects(pd_times):
'''
Notes
-----
This cannot (without additional info) determine whether an assymmetric offset is odd-long or even-long.
'''
pd_diff = np.diff(pd_times)
odd_diff_mean, odd_diff_std = trimmed_stats(pd_diff[1::2])
even_diff_mean, even_diff_std = trimmed_stats(pd_diff[0::2])
half_diff = np.diff(pd_times[0::2])
full_period_mean, full_period_std = trimmed_stats(half_diff)
half_period_mean = full_period_mean / 2
odd_offset = odd_diff_mean - half_period_mean
even_offset = even_diff_mean - half_period_mean
pd_times[::2] -= odd_offset / 2
pd_times[1::2] -= even_offset / 2
return pd_times
[docs]def flag_unexpected_edges(pd_times, ndevs=10):
pd_diff = np.diff(pd_times)
diff_mean, diff_std = trimmed_stats(pd_diff)
expected_duration_mask = np.ones(pd_diff.size)
expected_duration_mask[np.logical_or(
pd_diff < diff_mean - ndevs * diff_std,
pd_diff > diff_mean + ndevs * diff_std
)] = 0
expected_duration_mask[1:] = np.logical_and(expected_duration_mask[:-1], expected_duration_mask[1:])
expected_duration_mask = np.concatenate([expected_duration_mask, [expected_duration_mask[-1]]])
return expected_duration_mask
[docs]def fix_unexpected_edges(pd_times, ndevs=10, cycle=60, max_frame_offset=4):
pd_times = np.array(pd_times)
expected_duration_mask = flag_unexpected_edges(pd_times, ndevs=ndevs)
diff_mean, diff_std = trimmed_stats(np.diff(pd_times))
frame_interval = diff_mean / cycle
bad_edges = np.where(expected_duration_mask == 0)[0]
bad_blocks = np.sort(np.unique(np.concatenate([
[0],
np.where(np.diff(bad_edges) > 1)[0] + 1,
[len(bad_edges)]
])))
output_edges = []
for low, high in zip(bad_blocks[:-1], bad_blocks[1:]):
current_bad_edge_indices = bad_edges[low: high-1]
current_bad_edges = pd_times[current_bad_edge_indices]
low_bound = pd_times[current_bad_edge_indices[0]]
high_bound = pd_times[current_bad_edge_indices[-1] + 1]
edges_missing = int(np.around((high_bound - low_bound) / diff_mean))
expected = np.linspace(low_bound, high_bound, edges_missing + 1)
distances = distance.cdist(current_bad_edges[:, None], expected[:, None])
distances = np.around(distances / frame_interval).astype(int)
min_offsets = np.amin(distances, axis=0)
min_offset_indices = np.argmin(distances, axis=0)
output_edges = np.concatenate([
output_edges,
expected[min_offsets > max_frame_offset],
current_bad_edges[min_offset_indices[min_offsets <= max_frame_offset]]
])
return np.sort(np.concatenate([output_edges, pd_times[expected_duration_mask > 0]]))
[docs]def estimate_frame_duration(pd_times, cycle=60):
return trimmed_stats(np.diff(pd_times))[0] / cycle
[docs]def assign_to_last(index, starts, ends, frame_duration, irregularity, cycle):
ends[-1] += frame_duration * np.sign(irregularity)
return starts, ends
[docs]def allocate_by_vsync(vs_diff, index, starts, ends, frame_duration, irregularity, cycle):
current_vs_diff = vs_diff[index * cycle: (index + 1) * cycle]
sign = np.sign(irregularity)
if sign > 0:
vs_ind = np.argmax(current_vs_diff)
elif sign < 0:
vs_ind = np.argmin(current_vs_diff)
ends[vs_ind:] += sign * frame_duration
starts[vs_ind + 1:] += sign * frame_duration
return starts, ends
[docs]def compute_frame_times(photodiode_times, frame_duration, num_frames, cycle, irregular_interval_policy=assign_to_last):
indices = np.arange(num_frames)
starts = np.zeros(num_frames, dtype=float)
ends = np.zeros(num_frames, dtype=float)
num_intervals = len(photodiode_times) - 1
for start_index, (start_time, end_time) in enumerate(zip(photodiode_times[:-1], photodiode_times[1:])):
interval_duration = end_time - start_time
irregularity = int(np.around((interval_duration) / frame_duration)) - cycle
local_frame_duration = interval_duration / (cycle + irregularity)
durations = np.zeros(cycle + ( start_index == num_intervals - 1 )) + local_frame_duration
current_ends = np.cumsum(durations) + start_time
current_starts = current_ends - durations
while irregularity != 0:
current_starts, current_ends = irregular_interval_policy(
start_index, current_starts, current_ends, local_frame_duration, irregularity, cycle
)
irregularity += -1 * np.sign(irregularity)
early_frame = start_index * cycle
late_frame = (start_index + 1) * cycle + ( start_index == num_intervals - 1 )
remaining = starts[early_frame: late_frame].size
starts[early_frame: late_frame] = current_starts[:remaining]
ends[early_frame: late_frame] = current_ends[:remaining]
return indices, starts, ends