Source code for allensdk.internal.ephys.core_feature_extract

import sys, os, shutil
import logging
from collections import defaultdict
import numpy as np
import json
from six import iteritems

from allensdk.config.manifest import Manifest
from allensdk.core.json_utilities import json_handler

from allensdk.core.nwb_data_set import NwbDataSet
from allensdk.ephys.extract_cell_features import extract_cell_features, extract_sweep_features
from allensdk.ephys.ephys_features import FeatureError
from allensdk.ephys.ephys_extractor import reset_long_squares_start
import allensdk.internal.ephys.plot_qc_figures as plot_qc_figures


TEST_PULSE_DURATION_SEC = 0.4

LONG_SQUARE_COARSE = 'C1LSCOARSE'
LONG_SQUARE_FINE = 'C1LSFINEST'
SHORT_SQUARE = 'C1SSFINEST'
RAMP = 'C1RP25PR1S'
PASSED_SWEEP_STATES = [ 'manual_passed', 'auto_passed' ]
ICLAMP_UNITS = [ 'Amps', 'pA' ]

[docs]def filter_sweeps(sweeps, types=None, passed_only=True, iclamp_only=True): if passed_only: sweeps = [ s for s in sweeps if s.get('workflow_state', None) in PASSED_SWEEP_STATES ] if iclamp_only: sweeps = [ s for s in sweeps if s['stimulus_units'] in ICLAMP_UNITS ] if types: sweeps = [ s for s in sweeps for t in types if s['ephys_stimulus']['description'].startswith(t) ] return sorted(sweeps, key=lambda x: x['sweep_number'])
[docs]def filtered_sweep_numbers(sweeps, types=None, passed_only=True, iclamp_only=True): return [ s['sweep_number'] for s in filter_sweeps(sweeps, types, passed_only, iclamp_only) ]
[docs]def find_stim_start(stim, idx0=0): """ Find the index of the first nonzero positive or negative jump in an array. Parameters ---------- stim: np.ndarray Array to be searched idx0: int Start searching with this index (default: 0). Returns ------- int """ di = np.diff(stim) idxs = np.flatnonzero(di) idxs = idxs[idxs >= idx0] if len(idxs) == 0: return -1 return idxs[0]+1
[docs]def find_sweep_stim_start(data_set, sweep_number): sweep = data_set.get_sweep(sweep_number) sr = sweep['sampling_rate'] stim_start = find_stim_start(sweep['stimulus'], TEST_PULSE_DURATION_SEC * sr) / sr logging.info("Long square stims start at time %f", stim_start) return stim_start
[docs]def find_coarse_long_square_amp_delta(sweeps, decimals=0): """ Find the delta between amplitudes of coarse long square sweeps. Includes failed sweeps. """ sweeps = filter_sweeps(sweeps, types=[ LONG_SQUARE_COARSE ], passed_only = False) amps = sorted([s['stimulus_amplitude'] for s in sweeps]) amps_diff = np.round(np.diff(amps), decimals=decimals) amps_diff = amps_diff[amps_diff > 0] # repeats are okay deltas = sorted(np.unique(amps_diff)) # unique nonzero deltas if len(deltas) == 0: return 0 delta = deltas[0] if len(deltas) != 1: logging.warning("Found multiple coarse long square amplitude step differences: %s. Using: %f" % (str(deltas), delta)) return delta
[docs]def update_output_sweep_features(cell_features, sweep_features, sweep_index): # add peak deflection for subthreshold long squares for sweep_number, sweep in iteritems(sweep_index): pd = sweep_features.get(sweep_number,{}).get('peak_deflect', None) if pd is not None: sweep['peak_deflection'] = pd[0] # update num_spikes for sweep_num in sweep_features: num_spikes = len(sweep_features[sweep_num]['spikes']) if num_spikes == 0: num_spikes = None sweep_index[sweep_num]['num_spikes'] = num_spikes
[docs]def nan_get(obj, key): """ Return a value from a dictionary. If it does not exist, return None. If it is NaN, return None """ v = obj.get(key, None) if v is None: return None else: return None if np.isnan(v) else v
[docs]def generate_output_cell_features(cell_features, sweep_features, sweep_index): ephys_features = {} # find hero and rheo sweeps in sweep table rheo_sweep_num = cell_features["long_squares"]["rheobase_sweep"]["id"] rheo_sweep_id = sweep_index.get(rheo_sweep_num, {}).get('id', None) if rheo_sweep_id is None: raise Exception("Could not find id of rheobase sweep number %d." % rheo_sweep_num) hero_sweep = cell_features["long_squares"]["hero_sweep"] if hero_sweep is None: raise Exception("Could not find hero sweep") hero_sweep_num = hero_sweep["id"] hero_sweep_id = sweep_index.get(hero_sweep_num, {}).get('id', None) if hero_sweep_id is None: raise Exception("Could not find id of hero sweep number %d." % hero_sweep_num) # create a table of values # this is a dictionary of ephys_features base = cell_features["long_squares"] ephys_features["rheobase_sweep_id"] = rheo_sweep_id ephys_features["rheobase_sweep_num"] = rheo_sweep_num ephys_features["thumbnail_sweep_id"] = hero_sweep_id ephys_features["thumbnail_sweep_num"] = hero_sweep_num ephys_features["vrest"] = nan_get(base, "v_baseline") ephys_features["ri"] = nan_get(base, "input_resistance") # change the base to hero sweep base = cell_features["long_squares"]["hero_sweep"] ephys_features["adaptation"] = nan_get(base, "adapt") ephys_features["latency"] = nan_get(base, "latency") # convert to ms mean_isi = nan_get(base, "mean_isi") ephys_features["avg_isi"] = (mean_isi * 1e3) if mean_isi is not None else None # now grab the rheo spike base = cell_features["long_squares"]["rheobase_sweep"]["spikes"][0] ephys_features["upstroke_downstroke_ratio_long_square"] = nan_get(base, "upstroke_downstroke_ratio") ephys_features["peak_v_long_square"] = nan_get(base, "peak_v") ephys_features["peak_t_long_square"] = nan_get(base, "peak_t") ephys_features["trough_v_long_square"] = nan_get(base, "trough_v") ephys_features["trough_t_long_square"] = nan_get(base, "trough_t") ephys_features["fast_trough_v_long_square"] = nan_get(base, "fast_trough_v") ephys_features["fast_trough_t_long_square"] = nan_get(base, "fast_trough_t") ephys_features["slow_trough_v_long_square"] = nan_get(base, "slow_trough_v") ephys_features["slow_trough_t_long_square"] = nan_get(base, "slow_trough_t") ephys_features["threshold_v_long_square"] = nan_get(base, "threshold_v") ephys_features["threshold_i_long_square"] = nan_get(base, "threshold_i") ephys_features["threshold_t_long_square"] = nan_get(base, "threshold_t") ephys_features["peak_v_long_square"] = nan_get(base, "peak_v") ephys_features["peak_t_long_square"] = nan_get(base, "peak_t") base = cell_features["long_squares"] ephys_features["sag"] = nan_get(base, "sag") # convert to ms tau = nan_get(base, "tau") ephys_features["tau"] = (tau * 1e3) if tau is not None else None ephys_features["vm_for_sag"] = nan_get(base, "vm_for_sag") ephys_features["has_burst"] = None#base.get("has_burst", None) ephys_features["has_pause"] = None#base.get("has_pause", None) ephys_features["has_delay"] = None#base.get("has_delay", None) ephys_features["f_i_curve_slope"] = nan_get(base, "fi_fit_slope") # change the base to ramp base = cell_features["ramps"]["mean_spike_0"] # mean feature of first spike for all of these ephys_features["upstroke_downstroke_ratio_ramp"] = nan_get(base, "upstroke_downstroke_ratio") ephys_features["peak_v_ramp"] = nan_get(base, "peak_v") ephys_features["peak_t_ramp"] = nan_get(base, "peak_t") ephys_features["trough_v_ramp"] = nan_get(base, "trough_v") ephys_features["trough_t_ramp"] = nan_get(base, "trough_t") ephys_features["fast_trough_v_ramp"] = nan_get(base, "fast_trough_v") ephys_features["fast_trough_t_ramp"] = nan_get(base, "fast_trough_t") ephys_features["slow_trough_v_ramp"] = nan_get(base, "slow_trough_v") ephys_features["slow_trough_t_ramp"] = nan_get(base, "slow_trough_t") ephys_features["threshold_v_ramp"] = nan_get(base, "threshold_v") ephys_features["threshold_i_ramp"] = nan_get(base, "threshold_i") ephys_features["threshold_t_ramp"] = nan_get(base, "threshold_t") # change the base to short_square base = cell_features["short_squares"]["mean_spike_0"] # mean feature of first spike for all of these ephys_features["upstroke_downstroke_ratio_short_square"] = nan_get(base, "upstroke_downstroke_ratio") ephys_features["peak_v_short_square"] = nan_get(base, "peak_v") ephys_features["peak_t_short_square"] = nan_get(base, "peak_t") ephys_features["trough_v_short_square"] = nan_get(base, "trough_v") ephys_features["trough_t_short_square"] = nan_get(base, "trough_t") ephys_features["fast_trough_v_short_square"] = nan_get(base, "fast_trough_v") ephys_features["fast_trough_t_short_square"] = nan_get(base, "fast_trough_t") ephys_features["slow_trough_v_short_square"] = nan_get(base, "slow_trough_v") ephys_features["slow_trough_t_short_square"] = nan_get(base, "slow_trough_t") ephys_features["threshold_v_short_square"] = nan_get(base, "threshold_v") #ephys_features["threshold_i_short_square"] = nan_get(base, "threshold_i") ephys_features["threshold_t_short_square"] = nan_get(base, "threshold_t") ephys_features["threshold_i_short_square"] = nan_get(cell_features["short_squares"], "stimulus_amplitude") return ephys_features
[docs]def extract_data(data, nwb_file): ########################################################## #### alings with ephys_sweep_qc_tool extract_features #### cell_specimen = data['specimens'][0] sweep_list = cell_specimen['ephys_sweeps'] sweep_index = { s['sweep_number']:s for s in sweep_list } data_set = NwbDataSet(nwb_file) # extract sweep-level features logging.debug("Computing sweep features") iclamp_sweep_list = filter_sweeps(sweep_list, iclamp_only=True, passed_only=False) iclamp_sweeps = defaultdict(list) for s in iclamp_sweep_list: try: stimulus_type_name = s['ephys_stimulus']['ephys_stimulus_type']['name'] except KeyError as e: raise Exception("Sweep %d has no ephys stimulus record in features JSON file: %s" % (s['sweep_number'], json.dumps(s, indent=3, default=json_handler))) if stimulus_type_name == "Unknown": raise Exception(("Sweep %d (%s) has 'Unknown' stimulus type." + "Please update the EpysStimuli and EphysRawStimulusNames associations in LIMS.") % (s['sweep_number'], s['ephys_stimulus']['description'])) iclamp_sweeps[stimulus_type_name].append(s['sweep_number']) passed_iclamp_sweep_list = filter_sweeps(sweep_list, iclamp_only=True, passed_only=True) num_passed_sweeps = len(passed_iclamp_sweep_list) logging.info("%d of %d sweeps passed QC", num_passed_sweeps, len(iclamp_sweep_list)) if num_passed_sweeps == 0: raise FeatureError("There are no QC-passed sweeps available to analyze") # compute sweep features logging.info("Computing sweep features") sweep_features = extract_sweep_features(data_set, iclamp_sweeps) cell_specimen['sweep_ephys_features'] = sweep_features # extract cell-level features logging.info("Computing cell features") long_square_sweep_numbers = filtered_sweep_numbers(sweep_list, [ LONG_SQUARE_COARSE, LONG_SQUARE_FINE ]) short_square_sweep_numbers = filtered_sweep_numbers(sweep_list, [ SHORT_SQUARE ]) ramp_sweep_numbers = filtered_sweep_numbers(sweep_list, [ RAMP ]) logging.debug("long square sweeps: %s", str(long_square_sweep_numbers)) logging.debug("short square sweeps: %s", str(short_square_sweep_numbers)) logging.debug("ramp sweeps: %s", str(ramp_sweep_numbers)) # PBS-262 -- have variable subthreshold minimum for human cells subthresh_min_amp = None # None means default (mouse) behavior long_square_amp_delta = find_coarse_long_square_amp_delta(sweep_list) if long_square_amp_delta != 20.0: subthresh_min_amp = -200 logging.info("Long squares using %fpA step size. Using subthreshold minimum amplitude of %s.", long_square_amp_delta, str(subthresh_min_amp) if subthresh_min_amp is not None else "[default]") stim_start = find_sweep_stim_start(data_set, long_square_sweep_numbers[0]) if stim_start > 0: logging.info("resetting long square start time to: %f", stim_start) reset_long_squares_start(stim_start) cell_features = extract_cell_features(data_set, ramp_sweep_numbers, short_square_sweep_numbers, long_square_sweep_numbers, subthresh_min_amp) # shuffle peak deflection for the subthreshold long squares for s in cell_features["long_squares"]["subthreshold_sweeps"]: sweep_features[s['id']]['peak_deflect'] = s['peak_deflect'] cell_specimen['cell_ephys_features'] = cell_features update_output_sweep_features(cell_features, sweep_features, sweep_index) ephys_features = generate_output_cell_features(cell_features, sweep_features, sweep_index) try: out_ephys_features = cell_specimen.get('ephys_features',[])[0] out_ephys_features.update(ephys_features) except IndexError: cell_specimen['ephys_features'] = [ ephys_features ] #### breaks with ephys_sweep_qc_tool extract_features #### ########################################################## return sweep_list, sweep_features
[docs]def save_qc_figures(qc_fig_dir, nwb_file, output_data, plot_cell_figures): if os.path.exists(qc_fig_dir): logging.warning("removing existing qc figures directory: %s", qc_fig_dir) shutil.rmtree(qc_fig_dir) Manifest.safe_mkdir(qc_fig_dir) logging.debug("saving qc plot figures") plot_qc_figures.make_sweep_page(nwb_file, output_data, qc_fig_dir) plot_qc_figures.make_cell_page(nwb_file, output_data, qc_fig_dir, plot_cell_figures)