Source code for allensdk.internal.model.data_access
from allensdk.core.nwb_data_set import NwbDataSet
from scipy import signal
import numpy as np
[docs]def load_sweep(file_name, sweep_number, desired_dt=None, cut=0, bessel=False):
'''load a data sweep and do specified data processing.
Inputs:
file_name: string
name of .nwb data file
sweep_number:
number specifying the sweep to be loaded
desired_dt:
the size of the time step the data should be subsampled to
cut:
indicie of which to start reporting data (i.e. cut off data before this indicie)
bessel: dictionary
contains parameters 'N' and 'Wn' to implement standard python bessel filtering
Returns:
dictionary containing
voltage: array
current: array
dt: time step of the returned data
start_idx: the index at which the first stimulus starts (excluding the test pulse)
'''
ds = NwbDataSet(file_name)
data = ds.get_sweep(sweep_number)
data["dt"] = 1.0 / data["sampling_rate"]
if cut > 0:
data["response"] = data["response"][cut:]
data["stimulus"] = data["stimulus"][cut:]
if bessel:
sample_freq = 1. / data["dt"]
filt_coeff = (bessel["freq"]) / (sample_freq / 2.) # filter fraction of Nyquist frequency
b, a = signal.bessel(bessel["N"], filt_coeff, "low")
data['response'] = signal.filtfilt(b, a, data['response'], axis=0)
if desired_dt is not None:
if data["dt"] != desired_dt:
data["response"] = subsample_data(data["response"], "mean", data["dt"], desired_dt)
data["stimulus"] = subsample_data(data["stimulus"], "mean", data["dt"], desired_dt)
data["start_idx"] = int(data["index_range"][0] / (desired_dt / data["dt"]))
data["dt"] = desired_dt
if "start_idx" not in data:
data["start_idx"] = data["index_range"][0]
return {
"voltage": data["response"],
"current": data["stimulus"],
"dt": data["dt"],
"start_idx": data["start_idx"]
}
[docs]def load_sweeps(file_name, sweep_numbers, dt=None, cut=0, bessel=False):
'''load sweeps and do specified data processing.
Inputs:
file_name: string
name of .nwb data file
sweep_numbers:
sweep numbers to be loaded
desired_dt:
the size of the time step the data should be subsampled to
cut:
indicie of which to start reporting data (i.e. cut off data before this indicie)
bessel: dictionary
contains parameters 'N' and 'Wn' to implement standard python bessel filtering
Returns:
dictionary containing
voltage: list of voltage trace arrays
current: list of current trace arrays
dt: list of time step corresponding to each array of the returned data
start_idx: list of the indicies at which the first stimulus starts (excluding
the test pulse) in each returned sweep
'''
data = [ load_sweep(file_name, sweep_number, dt, cut, bessel) for sweep_number in sweep_numbers ]
return {
'voltage': [ d['voltage'] for d in data ],
'current': [ d['current'] for d in data ],
'dt': [ d['dt'] for d in data ],
'start_idx': [ d['start_idx'] for d in data ],
}
[docs]def subsample_data(data, method, present_time_step, desired_time_step):
if present_time_step > desired_time_step:
raise Exception("you desired time step is smaller than your present time step")
# number of elements to average over
n = int(desired_time_step / present_time_step)
data_subsampled = None
if method == "mean":
# if n does not divide evenly into the length of the array, crop off the end
end = n * int(len(data) / n)
return np.mean(data[:end].reshape(-1,n), 1)
raise Exception("unknown subsample method: %s" % (method))