import numpy as np
from collections import Counter
from allensdk.ephys.feature_extractor import EphysFeatureExtractor
import allensdk.internal.model.biophysical.ephys_utils as ephys_utils
[docs]def calculate_fi_curves(data_set, sweeps):
sweep_type = "C1LSCOARSE"
_, sweep_numbers, statuses = ephys_utils.get_sweeps_of_type(sweep_type, sweeps)
features = EphysFeatureExtractor()
coarse_fi_curve = []
sweep_status = dict(zip(sweep_numbers, statuses))
for s in sweep_numbers:
if sweep_status[s] in [ 'auto_failed', 'manual_failed' ]:
continue
v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, s)
if np.all(v[-100:] == 0):
continue
stim_start, stim_dur, stim_amp, start_idx, end_idx = ephys_utils.get_step_stim_characteristics(i, t)
features.process_instance(s, v, i, t, stim_start, stim_dur, "")
coarse_fi_curve.append((stim_amp, features.feature_list[-1].mean["n_spikes"] / stim_dur))
sweep_type = "C2SQRHELNG"
core2_fi_curve = []
core2_half_fi_curve = []
_, sweep_numbers, statuses = ephys_utils.get_sweeps_of_type(sweep_type, sweeps)
sweep_status = dict(zip(sweep_numbers, statuses))
core2_amps = {}
amp_list = []
for s in sweep_numbers:
if sweep_status[s] in [ 'auto_failed', 'manual_failed' ]:
continue
v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, s)
if np.all(v[-100:] == 0):
continue
stim_start, stim_dur, stim_amp, start_idx, end_idx = ephys_utils.get_step_stim_characteristics(i, t)
if stim_start is np.nan:
sweep_status[s] = "manual_failed"
continue
core2_amps[s] = stim_amp
amp_list.append(stim_amp)
core2_amp_counter = Counter(amp_list)
common_amps = core2_amp_counter.most_common(3)
features = EphysFeatureExtractor()
for amp, count in common_amps:
for k in core2_amps:
if core2_amps[k] == amp and sweep_status[k][-6:] == "passed":
v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, k)
stim_start, stim_dur, stim_amp, start_idx, end_idx = ephys_utils.get_step_stim_characteristics(i, t)
features.process_instance(s, v, i, t, stim_start, stim_dur, "")
core2_fi_curve.append((amp, features.feature_list[-1].mean["n_spikes"] / stim_dur))
first_half_spike_count = len([spk for spk in features.feature_list[-1].mean["spikes"] if spk["t"] < stim_start + stim_dur / 2.0])
core2_half_fi_curve.append((amp, first_half_spike_count / (stim_dur / 2.0)))
return { "coarse": coarse_fi_curve, "core2": core2_fi_curve, "core2_half": core2_half_fi_curve }
[docs]def estimate_fi_shift(data_set, sweeps):
curve_data = calculate_fi_curves(data_set, sweeps)
# Linear fit to original fI curve
coarse_fi_sorted = sorted(curve_data["coarse"], key=lambda d: d[0])
x = np.array([d[0] for d in coarse_fi_sorted], dtype=np.float64)
y = np.array([d[1] for d in coarse_fi_sorted], dtype=np.float64)
if len(np.flatnonzero(y)) == 0: # original curve is all zero, so can't figure out shift
return np.nan, 0
last_zero_index = np.flatnonzero(y)[0] - 1
A = np.vstack([x[last_zero_index:], np.ones(len(x[last_zero_index:]))]).T
m, c = np.linalg.lstsq(A, y[last_zero_index:])[0]
# Relative error of later traces to best-fit line
if len(curve_data["core2_half"]) < 1:
return np.nan, 0
# FIX TO RECTIFY PREDICTED FI CURVE
x_shift = [amp - (freq - c) / m for amp, freq in curve_data["core2_half"]]
return np.mean(x_shift), len(x_shift)