import sys, os, shutil
import logging
from collections import defaultdict
import numpy as np
import json
from six import iteritems
from allensdk.config.manifest import Manifest
from allensdk.core.json_utilities import json_handler
from allensdk.core.nwb_data_set import NwbDataSet
from allensdk.ephys.extract_cell_features import extract_cell_features, extract_sweep_features
from allensdk.ephys.ephys_features import FeatureError
from allensdk.ephys.ephys_extractor import reset_long_squares_start
import allensdk.internal.ephys.plot_qc_figures as plot_qc_figures
TEST_PULSE_DURATION_SEC = 0.4
LONG_SQUARE_COARSE = 'C1LSCOARSE'
LONG_SQUARE_FINE = 'C1LSFINEST'
SHORT_SQUARE = 'C1SSFINEST'
RAMP = 'C1RP25PR1S'
PASSED_SWEEP_STATES = [ 'manual_passed', 'auto_passed' ]
ICLAMP_UNITS = [ 'Amps', 'pA' ]
[docs]def filter_sweeps(sweeps, types=None, passed_only=True, iclamp_only=True):
if passed_only:
sweeps = [ s for s in sweeps if s.get('workflow_state', None) in PASSED_SWEEP_STATES ]
if iclamp_only:
sweeps = [ s for s in sweeps if s['stimulus_units'] in ICLAMP_UNITS ]
if types:
sweeps = [ s for s in sweeps for t in types
if s['ephys_stimulus']['description'].startswith(t) ]
return sorted(sweeps, key=lambda x: x['sweep_number'])
[docs]def filtered_sweep_numbers(sweeps, types=None, passed_only=True, iclamp_only=True):
return [ s['sweep_number'] for s in filter_sweeps(sweeps, types, passed_only, iclamp_only) ]
[docs]def find_stim_start(stim, idx0=0):
"""
Find the index of the first nonzero positive or negative jump in an array.
Parameters
----------
stim: np.ndarray
Array to be searched
idx0: int
Start searching with this index (default: 0).
Returns
-------
int
"""
di = np.diff(stim)
idxs = np.flatnonzero(di)
idxs = idxs[idxs >= idx0]
if len(idxs) == 0:
return -1
return idxs[0]+1
[docs]def find_sweep_stim_start(data_set, sweep_number):
sweep = data_set.get_sweep(sweep_number)
sr = sweep['sampling_rate']
stim_start = find_stim_start(sweep['stimulus'], TEST_PULSE_DURATION_SEC * sr) / sr
logging.info("Long square stims start at time %f", stim_start)
return stim_start
[docs]def find_coarse_long_square_amp_delta(sweeps, decimals=0):
""" Find the delta between amplitudes of coarse long square sweeps. Includes failed sweeps. """
sweeps = filter_sweeps(sweeps, types=[ LONG_SQUARE_COARSE ], passed_only = False)
amps = sorted([s['stimulus_amplitude'] for s in sweeps])
amps_diff = np.round(np.diff(amps), decimals=decimals)
amps_diff = amps_diff[amps_diff > 0] # repeats are okay
deltas = sorted(np.unique(amps_diff)) # unique nonzero deltas
if len(deltas) == 0:
return 0
delta = deltas[0]
if len(deltas) != 1:
logging.warning("Found multiple coarse long square amplitude step differences: %s. Using: %f" % (str(deltas), delta))
return delta
[docs]def update_output_sweep_features(cell_features, sweep_features, sweep_index):
# add peak deflection for subthreshold long squares
for sweep_number, sweep in iteritems(sweep_index):
pd = sweep_features.get(sweep_number,{}).get('peak_deflect', None)
if pd is not None:
sweep['peak_deflection'] = pd[0]
# update num_spikes
for sweep_num in sweep_features:
num_spikes = len(sweep_features[sweep_num]['spikes'])
if num_spikes == 0:
num_spikes = None
sweep_index[sweep_num]['num_spikes'] = num_spikes
[docs]def nan_get(obj, key):
""" Return a value from a dictionary. If it does not exist, return None. If it is NaN, return None """
v = obj.get(key, None)
if v is None:
return None
else:
return None if np.isnan(v) else v
[docs]def generate_output_cell_features(cell_features, sweep_features, sweep_index):
ephys_features = {}
# find hero and rheo sweeps in sweep table
rheo_sweep_num = cell_features["long_squares"]["rheobase_sweep"]["id"]
rheo_sweep_id = sweep_index.get(rheo_sweep_num, {}).get('id', None)
if rheo_sweep_id is None:
raise Exception("Could not find id of rheobase sweep number %d." % rheo_sweep_num)
hero_sweep = cell_features["long_squares"]["hero_sweep"]
if hero_sweep is None:
raise Exception("Could not find hero sweep")
hero_sweep_num = hero_sweep["id"]
hero_sweep_id = sweep_index.get(hero_sweep_num, {}).get('id', None)
if hero_sweep_id is None:
raise Exception("Could not find id of hero sweep number %d." % hero_sweep_num)
# create a table of values
# this is a dictionary of ephys_features
base = cell_features["long_squares"]
ephys_features["rheobase_sweep_id"] = rheo_sweep_id
ephys_features["rheobase_sweep_num"] = rheo_sweep_num
ephys_features["thumbnail_sweep_id"] = hero_sweep_id
ephys_features["thumbnail_sweep_num"] = hero_sweep_num
ephys_features["vrest"] = nan_get(base, "v_baseline")
ephys_features["ri"] = nan_get(base, "input_resistance")
# change the base to hero sweep
base = cell_features["long_squares"]["hero_sweep"]
ephys_features["adaptation"] = nan_get(base, "adapt")
ephys_features["latency"] = nan_get(base, "latency")
# convert to ms
mean_isi = nan_get(base, "mean_isi")
ephys_features["avg_isi"] = (mean_isi * 1e3) if mean_isi is not None else None
# now grab the rheo spike
base = cell_features["long_squares"]["rheobase_sweep"]["spikes"][0]
ephys_features["upstroke_downstroke_ratio_long_square"] = nan_get(base, "upstroke_downstroke_ratio")
ephys_features["peak_v_long_square"] = nan_get(base, "peak_v")
ephys_features["peak_t_long_square"] = nan_get(base, "peak_t")
ephys_features["trough_v_long_square"] = nan_get(base, "trough_v")
ephys_features["trough_t_long_square"] = nan_get(base, "trough_t")
ephys_features["fast_trough_v_long_square"] = nan_get(base, "fast_trough_v")
ephys_features["fast_trough_t_long_square"] = nan_get(base, "fast_trough_t")
ephys_features["slow_trough_v_long_square"] = nan_get(base, "slow_trough_v")
ephys_features["slow_trough_t_long_square"] = nan_get(base, "slow_trough_t")
ephys_features["threshold_v_long_square"] = nan_get(base, "threshold_v")
ephys_features["threshold_i_long_square"] = nan_get(base, "threshold_i")
ephys_features["threshold_t_long_square"] = nan_get(base, "threshold_t")
ephys_features["peak_v_long_square"] = nan_get(base, "peak_v")
ephys_features["peak_t_long_square"] = nan_get(base, "peak_t")
base = cell_features["long_squares"]
ephys_features["sag"] = nan_get(base, "sag")
# convert to ms
tau = nan_get(base, "tau")
ephys_features["tau"] = (tau * 1e3) if tau is not None else None
ephys_features["vm_for_sag"] = nan_get(base, "vm_for_sag")
ephys_features["has_burst"] = None#base.get("has_burst", None)
ephys_features["has_pause"] = None#base.get("has_pause", None)
ephys_features["has_delay"] = None#base.get("has_delay", None)
ephys_features["f_i_curve_slope"] = nan_get(base, "fi_fit_slope")
# change the base to ramp
base = cell_features["ramps"]["mean_spike_0"] # mean feature of first spike for all of these
ephys_features["upstroke_downstroke_ratio_ramp"] = nan_get(base, "upstroke_downstroke_ratio")
ephys_features["peak_v_ramp"] = nan_get(base, "peak_v")
ephys_features["peak_t_ramp"] = nan_get(base, "peak_t")
ephys_features["trough_v_ramp"] = nan_get(base, "trough_v")
ephys_features["trough_t_ramp"] = nan_get(base, "trough_t")
ephys_features["fast_trough_v_ramp"] = nan_get(base, "fast_trough_v")
ephys_features["fast_trough_t_ramp"] = nan_get(base, "fast_trough_t")
ephys_features["slow_trough_v_ramp"] = nan_get(base, "slow_trough_v")
ephys_features["slow_trough_t_ramp"] = nan_get(base, "slow_trough_t")
ephys_features["threshold_v_ramp"] = nan_get(base, "threshold_v")
ephys_features["threshold_i_ramp"] = nan_get(base, "threshold_i")
ephys_features["threshold_t_ramp"] = nan_get(base, "threshold_t")
# change the base to short_square
base = cell_features["short_squares"]["mean_spike_0"] # mean feature of first spike for all of these
ephys_features["upstroke_downstroke_ratio_short_square"] = nan_get(base, "upstroke_downstroke_ratio")
ephys_features["peak_v_short_square"] = nan_get(base, "peak_v")
ephys_features["peak_t_short_square"] = nan_get(base, "peak_t")
ephys_features["trough_v_short_square"] = nan_get(base, "trough_v")
ephys_features["trough_t_short_square"] = nan_get(base, "trough_t")
ephys_features["fast_trough_v_short_square"] = nan_get(base, "fast_trough_v")
ephys_features["fast_trough_t_short_square"] = nan_get(base, "fast_trough_t")
ephys_features["slow_trough_v_short_square"] = nan_get(base, "slow_trough_v")
ephys_features["slow_trough_t_short_square"] = nan_get(base, "slow_trough_t")
ephys_features["threshold_v_short_square"] = nan_get(base, "threshold_v")
#ephys_features["threshold_i_short_square"] = nan_get(base, "threshold_i")
ephys_features["threshold_t_short_square"] = nan_get(base, "threshold_t")
ephys_features["threshold_i_short_square"] = nan_get(cell_features["short_squares"], "stimulus_amplitude")
return ephys_features