Source code for allensdk.internal.model.data_access

from allensdk.core.nwb_data_set import NwbDataSet
from scipy import signal
import numpy as np

[docs]def load_sweep(file_name, sweep_number, desired_dt=None, cut=0, bessel=False): '''load a data sweep and do specified data processing. Inputs: file_name: string name of .nwb data file sweep_number: number specifying the sweep to be loaded desired_dt: the size of the time step the data should be subsampled to cut: indicie of which to start reporting data (i.e. cut off data before this indicie) bessel: dictionary contains parameters 'N' and 'Wn' to implement standard python bessel filtering Returns: dictionary containing voltage: array current: array dt: time step of the returned data start_idx: the index at which the first stimulus starts (excluding the test pulse) ''' ds = NwbDataSet(file_name) data = ds.get_sweep(sweep_number) data["dt"] = 1.0 / data["sampling_rate"] if cut > 0: data["response"] = data["response"][cut:] data["stimulus"] = data["stimulus"][cut:] if bessel: sample_freq = 1. / data["dt"] filt_coeff = (bessel["freq"]) / (sample_freq / 2.) # filter fraction of Nyquist frequency b, a = signal.bessel(bessel["N"], filt_coeff, "low") data['response'] = signal.filtfilt(b, a, data['response'], axis=0) if desired_dt is not None: if data["dt"] != desired_dt: data["response"] = subsample_data(data["response"], "mean", data["dt"], desired_dt) data["stimulus"] = subsample_data(data["stimulus"], "mean", data["dt"], desired_dt) data["start_idx"] = int(data["index_range"][0] / (desired_dt / data["dt"])) data["dt"] = desired_dt if "start_idx" not in data: data["start_idx"] = data["index_range"][0] return { "voltage": data["response"], "current": data["stimulus"], "dt": data["dt"], "start_idx": data["start_idx"] }
[docs]def load_sweeps(file_name, sweep_numbers, dt=None, cut=0, bessel=False): '''load sweeps and do specified data processing. Inputs: file_name: string name of .nwb data file sweep_numbers: sweep numbers to be loaded desired_dt: the size of the time step the data should be subsampled to cut: indicie of which to start reporting data (i.e. cut off data before this indicie) bessel: dictionary contains parameters 'N' and 'Wn' to implement standard python bessel filtering Returns: dictionary containing voltage: list of voltage trace arrays current: list of current trace arrays dt: list of time step corresponding to each array of the returned data start_idx: list of the indicies at which the first stimulus starts (excluding the test pulse) in each returned sweep ''' data = [ load_sweep(file_name, sweep_number, dt, cut, bessel) for sweep_number in sweep_numbers ] return { 'voltage': [ d['voltage'] for d in data ], 'current': [ d['current'] for d in data ], 'dt': [ d['dt'] for d in data ], 'start_idx': [ d['start_idx'] for d in data ], }
[docs]def subsample_data(data, method, present_time_step, desired_time_step): if present_time_step > desired_time_step: raise Exception("you desired time step is smaller than your present time step") # number of elements to average over n = int(desired_time_step / present_time_step) data_subsampled = None if method == "mean": # if n does not divide evenly into the length of the array, crop off the end end = n * int(len(data) / n) return np.mean(data[:end].reshape(-1,n), 1) raise Exception("unknown subsample method: %s" % (method))