Source code for allensdk.internal.model.biophysical.passive_fitting.preprocess

import allensdk.internal.model.biophysical.ephys_utils as ephys_utils
import logging
import numpy as np
import pandas as pd

_passive_fit_log = logging.getLogger(
    'allensdk.model.biophysical.passive_fitting.preprocess')

[docs]def get_passive_fit_data(cap_check_sweeps, data_set): bridge_balances = [s['bridge_balance_mohm'] for s in cap_check_sweeps] bridge_avg = np.array(bridge_balances).mean() _passive_fit_log.debug("bridge avg {:.2f}".format(bridge_avg)) initialized = False for idx, s in enumerate(cap_check_sweeps): v, i, t = ephys_utils.get_sweep_v_i_t_from_set(data_set, s['sweep_number']) if v is None: continue up_idxs, down_idxs = get_cap_check_indices(i) down_idx_interval = down_idxs[1] - down_idxs[0] skip_count = 0 for j in range(len(up_idxs)): if j == 0: avg_up = v[(up_idxs[j] - 400):down_idxs[j + 1]] avg_down = v[(down_idxs[j] - 400):up_idxs[j]] elif j == len(up_idxs) - 1: avg_up = avg_up + v[(up_idxs[j] - 400):-2] avg_down = avg_down + v[(down_idxs[j] - 400):up_idxs[j]] else: avg_up = avg_up + v[(up_idxs[j] - 400):down_idxs[j + 1]] avg_down = avg_down + v[(down_idxs[j] - 400):up_idxs[j]] avg_up /= len(up_idxs) - skip_count avg_down /= len(up_idxs) - skip_count if not initialized: grand_up = avg_up - avg_up[0:400].mean() grand_down = avg_down - avg_down[0:400].mean() initialized = True else: grand_up = grand_up + (avg_up - avg_up[0:400].mean()) grand_down = grand_down + (avg_down - avg_down[0:400].mean()) grand_up /= len(cap_check_sweeps) grand_down /= len(cap_check_sweeps) t = 0.005 * np.arange(len(grand_up)) # in ms, assumes 200kHz sampling rate] grand_up_data = np.column_stack((t, grand_up)) grand_down_data = np.column_stack((t, grand_down)) grand_diff = (grand_up + grand_down) / grand_up avg_grand_diff = pd.rolling_mean(pd.Series(grand_diff, index=t), 100) threshold = 0.2 start_index = np.flatnonzero(t >= 4.0)[0] escape_indexes = np.flatnonzero(np.abs(avg_grand_diff.values[start_index:]) > threshold) + start_index if len(escape_indexes) < 1: escape_index = len(t) - 1 else: escape_index = escape_indexes[0] escape_t = t[escape_index] return { 'grand_up': grand_up_data, 'grand_down': grand_down_data, 'escape_t': escape_t, 'bridge_avg': bridge_avg }
[docs]def get_cap_check_indices(i): # Assumes that there is a test pulse followed by the stimulus pulses (downward first) di = np.diff(i) up_idx = np.flatnonzero(di > 0) down_idx = np.flatnonzero(di < 0) return up_idx[2::2], down_idx[1::2]
[docs]def main(): pass
if __name__ == "__main__": main()