Source code for allensdk.internal.model.biophysical.check_fi_shift

import numpy as np
from collections import Counter
from allensdk.ephys.feature_extractor import EphysFeatureExtractor
import allensdk.internal.model.biophysical.ephys_utils as ephys_utils

[docs]def calculate_fi_curves(data_set, sweeps): sweep_type = "C1LSCOARSE" _, sweep_numbers, statuses = ephys_utils.get_sweeps_of_type(sweep_type, sweeps) features = EphysFeatureExtractor() coarse_fi_curve = [] sweep_status = dict(zip(sweep_numbers, statuses)) for s in sweep_numbers: if sweep_status[s] in [ 'auto_failed', 'manual_failed' ]: continue v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, s) if np.all(v[-100:] == 0): continue stim_start, stim_dur, stim_amp, start_idx, end_idx = ephys_utils.get_step_stim_characteristics(i, t) features.process_instance(s, v, i, t, stim_start, stim_dur, "") coarse_fi_curve.append((stim_amp, features.feature_list[-1].mean["n_spikes"] / stim_dur)) sweep_type = "C2SQRHELNG" core2_fi_curve = [] core2_half_fi_curve = [] _, sweep_numbers, statuses = ephys_utils.get_sweeps_of_type(sweep_type, sweeps) sweep_status = dict(zip(sweep_numbers, statuses)) core2_amps = {} amp_list = [] for s in sweep_numbers: if sweep_status[s] in [ 'auto_failed', 'manual_failed' ]: continue v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, s) if np.all(v[-100:] == 0): continue stim_start, stim_dur, stim_amp, start_idx, end_idx = ephys_utils.get_step_stim_characteristics(i, t) if stim_start is np.nan: sweep_status[s] = "manual_failed" continue core2_amps[s] = stim_amp amp_list.append(stim_amp) core2_amp_counter = Counter(amp_list) common_amps = core2_amp_counter.most_common(3) features = EphysFeatureExtractor() for amp, count in common_amps: for k in core2_amps: if core2_amps[k] == amp and sweep_status[k][-6:] == "passed": v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, k) stim_start, stim_dur, stim_amp, start_idx, end_idx = ephys_utils.get_step_stim_characteristics(i, t) features.process_instance(s, v, i, t, stim_start, stim_dur, "") core2_fi_curve.append((amp, features.feature_list[-1].mean["n_spikes"] / stim_dur)) first_half_spike_count = len([spk for spk in features.feature_list[-1].mean["spikes"] if spk["t"] < stim_start + stim_dur / 2.0]) core2_half_fi_curve.append((amp, first_half_spike_count / (stim_dur / 2.0))) return { "coarse": coarse_fi_curve, "core2": core2_fi_curve, "core2_half": core2_half_fi_curve }
[docs]def estimate_fi_shift(data_set, sweeps): curve_data = calculate_fi_curves(data_set, sweeps) # Linear fit to original fI curve coarse_fi_sorted = sorted(curve_data["coarse"], key=lambda d: d[0]) x = np.array([d[0] for d in coarse_fi_sorted], dtype=np.float64) y = np.array([d[1] for d in coarse_fi_sorted], dtype=np.float64) if len(np.flatnonzero(y)) == 0: # original curve is all zero, so can't figure out shift return np.nan, 0 last_zero_index = np.flatnonzero(y)[0] - 1 A = np.vstack([x[last_zero_index:], np.ones(len(x[last_zero_index:]))]).T m, c = np.linalg.lstsq(A, y[last_zero_index:])[0] # Relative error of later traces to best-fit line if len(curve_data["core2_half"]) < 1: return np.nan, 0 # FIX TO RECTIFY PREDICTED FI CURVE x_shift = [amp - (freq - c) / m for amp, freq in curve_data["core2_half"]] return np.mean(x_shift), len(x_shift)