Source code for allensdk.internal.model.biophysical.passive_fitting.preprocess
import allensdk.internal.model.biophysical.ephys_utils as ephys_utils
import logging
import numpy as np
import pandas as pd
_passive_fit_log = logging.getLogger(
'allensdk.model.biophysical.passive_fitting.preprocess')
[docs]def get_passive_fit_data(cap_check_sweeps, data_set):
bridge_balances = [s['bridge_balance_mohm'] for s in cap_check_sweeps]
bridge_avg = np.array(bridge_balances).mean()
_passive_fit_log.debug("bridge avg {:.2f}".format(bridge_avg))
initialized = False
for idx, s in enumerate(cap_check_sweeps):
v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set,
s['sweep_number'])
if v is None:
continue
up_idxs, down_idxs = get_cap_check_indices(i)
down_idx_interval = down_idxs[1] - down_idxs[0]
skip_count = 0
for j in range(len(up_idxs)):
if j == 0:
avg_up = v[(up_idxs[j] - 400):down_idxs[j + 1]]
avg_down = v[(down_idxs[j] - 400):up_idxs[j]]
elif j == len(up_idxs) - 1:
avg_up = avg_up + v[(up_idxs[j] - 400):-2]
avg_down = avg_down + v[(down_idxs[j] - 400):up_idxs[j]]
else:
avg_up = avg_up + v[(up_idxs[j] - 400):down_idxs[j + 1]]
avg_down = avg_down + v[(down_idxs[j] - 400):up_idxs[j]]
avg_up /= len(up_idxs) - skip_count
avg_down /= len(up_idxs) - skip_count
if not initialized:
grand_up = avg_up - avg_up[0:400].mean()
grand_down = avg_down - avg_down[0:400].mean()
initialized = True
else:
grand_up = grand_up + (avg_up - avg_up[0:400].mean())
grand_down = grand_down + (avg_down - avg_down[0:400].mean())
grand_up /= len(cap_check_sweeps)
grand_down /= len(cap_check_sweeps)
t = 0.005 * np.arange(len(grand_up)) # in ms, assumes 200kHz sampling rate]
grand_up_data = np.column_stack((t, grand_up))
grand_down_data = np.column_stack((t, grand_down))
grand_diff = (grand_up + grand_down) / grand_up
avg_grand_diff = pd.rolling_mean(pd.Series(grand_diff, index=t), 100)
threshold = 0.2
start_index = np.flatnonzero(t >= 4.0)[0]
escape_indexes = np.flatnonzero(np.abs(avg_grand_diff.values[start_index:]) > threshold) + start_index
if len(escape_indexes) < 1:
escape_index = len(t) - 1
else:
escape_index = escape_indexes[0]
escape_t = t[escape_index]
return {
'grand_up': grand_up_data,
'grand_down': grand_down_data,
'escape_t': escape_t,
'bridge_avg': bridge_avg
}
[docs]def get_cap_check_indices(i):
# Assumes that there is a test pulse followed by the stimulus pulses (downward first)
di = np.diff(i)
up_idx = np.flatnonzero(di > 0)
down_idx = np.flatnonzero(di < 0)
return up_idx[2::2], down_idx[1::2]
if __name__ == "__main__": main()