import numpy as np
import scipy.spatial.distance as distance
[docs]def trimmed_stats(data, pctiles=(10, 90)):
low = np.percentile(data, pctiles[0])
high = np.percentile(data, pctiles[1])
trimmed = data[np.logical_and(
data <= high,
data >= low
)]
return np.mean(trimmed), np.std(trimmed)
[docs]def trim_discontiguous_vsyncs(vs_times, photodiode_cycle=60):
vs_times = np.array(vs_times)
breaks = np.where(np.diff(vs_times) > (1/photodiode_cycle)*100)[0]
if len(breaks) > 0:
chunk_sizes = np.diff(np.concatenate((np.array([0, ]),
breaks,
np.array([len(vs_times), ]))))
largest_chunk = np.argmax(chunk_sizes)
if largest_chunk == 0:
return vs_times[:np.min(breaks+1)]
elif largest_chunk == len(breaks):
return vs_times[np.max(breaks+1):]
else:
return vs_times[breaks[largest_chunk-1]:breaks[largest_chunk]]
else:
return vs_times
[docs]def separate_vsyncs_and_photodiode_times(vs_times,
pd_times,
photodiode_cycle=60):
vs_times = np.array(vs_times)
pd_times = np.array(pd_times)
breaks = np.where(np.diff(vs_times) > (1/photodiode_cycle)*100)[0]
shift = 2.0
break_times = [-shift]
break_times.extend(vs_times[breaks].tolist())
break_times.extend([np.inf])
vs_times_out = []
pd_times_out = []
for indx, b in enumerate(break_times[:-1]):
pd_in_range = np.where((pd_times > break_times[indx] + shift) *
(pd_times <= break_times[indx+1] + shift))[0]
vs_in_range = np.where((vs_times > break_times[indx]) *
(vs_times <= break_times[indx+1]))[0]
vs_times_out.append(vs_times[vs_in_range])
pd_times_out.append(pd_times[pd_in_range])
return vs_times_out, pd_times_out
[docs]def trim_border_pulses(pd_times, vs_times, frame_interval=1/60, num_frames=5):
pd_times = np.array(pd_times)
return pd_times[np.logical_and(
pd_times >= vs_times[0],
pd_times <= vs_times[-1] + num_frames * frame_interval
)]
[docs]def correct_on_off_effects(pd_times):
'''
Notes
-----
This cannot (without additional info) determine whether an assymmetric
offset is odd-long or even-long.
'''
pd_diff = np.diff(pd_times)
odd_diff_mean, odd_diff_std = trimmed_stats(pd_diff[1::2])
even_diff_mean, even_diff_std = trimmed_stats(pd_diff[0::2])
half_diff = np.diff(pd_times[0::2])
full_period_mean, full_period_std = trimmed_stats(half_diff)
half_period_mean = full_period_mean / 2
odd_offset = odd_diff_mean - half_period_mean
even_offset = even_diff_mean - half_period_mean
pd_times[::2] -= odd_offset / 2
pd_times[1::2] -= even_offset / 2
return pd_times
[docs]def flag_unexpected_edges(pd_times, ndevs=10):
pd_diff = np.diff(pd_times)
diff_mean, diff_std = trimmed_stats(pd_diff)
expected_duration_mask = np.ones(pd_diff.size)
expected_duration_mask[np.logical_or(
pd_diff < diff_mean - ndevs * diff_std,
pd_diff > diff_mean + ndevs * diff_std
)] = 0
expected_duration_mask[1:] = np.logical_and(expected_duration_mask[:-1],
expected_duration_mask[1:])
expected_duration_mask = np.concatenate([expected_duration_mask,
[expected_duration_mask[-1]]])
return expected_duration_mask
[docs]def fix_unexpected_edges(pd_times, ndevs=10, cycle=60, max_frame_offset=4):
pd_times = np.array(pd_times)
expected_duration_mask = flag_unexpected_edges(pd_times, ndevs=ndevs)
diff_mean, diff_std = trimmed_stats(np.diff(pd_times))
frame_interval = diff_mean / cycle
bad_edges = np.where(expected_duration_mask == 0)[0]
bad_blocks = np.sort(np.unique(np.concatenate([
[0],
np.where(np.diff(bad_edges) > 1)[0] + 1,
[len(bad_edges)]
])))
output_edges = []
for low, high in zip(bad_blocks[:-1], bad_blocks[1:]):
current_bad_edge_indices = bad_edges[low: high-1]
current_bad_edges = pd_times[current_bad_edge_indices]
low_bound = pd_times[current_bad_edge_indices[0]]
high_bound = pd_times[current_bad_edge_indices[-1] + 1]
edges_missing = int(np.around((high_bound - low_bound) / diff_mean))
expected = np.linspace(low_bound, high_bound, edges_missing + 1)
distances = distance.cdist(current_bad_edges[:, None],
expected[:, None])
distances = np.around(distances / frame_interval).astype(int)
min_offsets = np.amin(distances, axis=0)
min_offset_indices = np.argmin(distances, axis=0)
output_edges = np.concatenate([
output_edges,
expected[min_offsets > max_frame_offset],
current_bad_edges[min_offset_indices[min_offsets <=
max_frame_offset]]
])
return np.sort(np.concatenate([output_edges,
pd_times[expected_duration_mask > 0]]))
[docs]def estimate_frame_duration(pd_times, cycle=60):
return trimmed_stats(np.diff(pd_times))[0] / cycle
[docs]def assign_to_last(index, starts, ends, frame_duration, irregularity, cycle):
ends[-1] += frame_duration * np.sign(irregularity)
return starts, ends
[docs]def allocate_by_vsync(vs_diff,
index,
starts,
ends,
frame_duration,
irregularity,
cycle):
current_vs_diff = vs_diff[index * cycle: (index + 1) * cycle]
sign = np.sign(irregularity)
if sign > 0:
vs_ind = np.argmax(current_vs_diff)
elif sign < 0:
vs_ind = np.argmin(current_vs_diff)
ends[vs_ind:] += sign * frame_duration
starts[vs_ind + 1:] += sign * frame_duration
return starts, ends
[docs]def compute_frame_times(photodiode_times,
frame_duration,
num_frames,
cycle,
irregular_interval_policy=assign_to_last):
indices = np.arange(num_frames)
starts = np.zeros(num_frames, dtype=float)
ends = np.zeros(num_frames, dtype=float)
num_intervals = len(photodiode_times) - 1
for start_index, (start_time, end_time) in \
enumerate(zip(photodiode_times[:-1], photodiode_times[1:])):
interval_duration = end_time - start_time
irregularity = \
int(np.around((interval_duration) / frame_duration)) - cycle
local_frame_duration = interval_duration / (cycle + irregularity)
durations = \
np.zeros(cycle +
(start_index == num_intervals - 1)) + local_frame_duration
current_ends = np.cumsum(durations) + start_time
current_starts = current_ends - durations
while irregularity != 0:
current_starts, current_ends = irregular_interval_policy(
start_index,
current_starts,
current_ends,
local_frame_duration,
irregularity, cycle
)
irregularity += -1 * np.sign(irregularity)
early_frame = start_index * cycle
late_frame = \
(start_index + 1) * cycle + (start_index == num_intervals - 1)
remaining = starts[early_frame: late_frame].size
starts[early_frame: late_frame] = current_starts[:remaining]
ends[early_frame: late_frame] = current_ends[:remaining]
return indices, starts, ends
#
#