Source code for allensdk.internal.model.glif.find_sweeps

import json, sys, os
import logging
import argparse
from six import iteritems
from six.moves import xrange
import allensdk.core.json_utilities as ju


SHORT_SQUARE = 'Short Square'
SHORT_SQUARE_60 = 'Short Square - Hold -60mv'
SHORT_SQUARE_80 = 'Short Square - Hold -80mv'
LONG_SQUARE = 'Long Square'
RAMP = 'Ramp'
NOISE1 = 'Noise 1'
NOISE2 = 'Noise 2'
SHORT_SQUARE_TRIPLE = 'Short Square - Triple'
RAMP_TO_RHEO = 'Ramp to Rheobase'


[docs]class MissingSweepException( Exception ): pass
[docs]def get_sweep_numbers(sweep_list): return [ s['sweep_number'] for s in sweep_list]
[docs]def get_sweeps_by_name(sweeps, sweep_type): if isinstance(sweeps, dict): return [ s for sn,s in iteritems(sweeps) if s[u'ephys_stimulus'][u'ephys_stimulus_type'][u'name'] == sweep_type ] else: return [ s for s in sweeps if s[u'ephys_stimulus'][u'ephys_stimulus_type'][u'name'] == sweep_type ]
[docs]def find_ranked_sweep(sweep_list, key, reverse=False): if sweep_list: sorted_sweep_list = sorted(sweep_list, key=lambda x: x[key], reverse=reverse) out_sweeps = [ sorted_sweep_list[0] ] for i in xrange(1,len(sweep_list)): if sorted_sweep_list[i][key] == out_sweeps[0][key]: out_sweeps.append(sorted_sweep_list[i]) else: break return get_sweep_numbers(out_sweeps) else: return []
[docs]def organize_sweeps_by_name(sweeps, name): sweep_list = sorted(get_sweeps_by_name(sweeps, name), key=lambda x: x['sweep_number']) subthreshold_list = [ s for s in sweep_list if s.get('num_spikes',None) in [0, None] ] suprathreshold_list = [ s for s in sweep_list if s.get('num_spikes',None) > 0 ] return { 'all': get_sweep_numbers(sweep_list), 'subthreshold': get_sweep_numbers(subthreshold_list), 'suprathreshold': get_sweep_numbers(suprathreshold_list), 'maximum_subthreshold': find_ranked_sweep(subthreshold_list, 'stimulus_amplitude', reverse=True), 'minimum_suprathreshold': find_ranked_sweep(suprathreshold_list, 'stimulus_amplitude') #'maximum_subthreshold': find_ranked_sweep(subthreshold_list, 'stimulus_absolute_amplitude', reverse=True), #'minimum_suprathreshold': find_ranked_sweep(suprathreshold_list, 'stimulus_absolute_amplitude') }
[docs]def find_long_square_sweeps(sweeps): out = organize_sweeps_by_name(sweeps, LONG_SQUARE) return out
[docs]def find_ramp_to_rheo_sweeps(sweeps): out = organize_sweeps_by_name(sweeps, RAMP_TO_RHEO) return out
[docs]def find_short_square_sweeps(sweeps): ''' Find 1) all of the subthreshold short square sweeps 2) all of the superthreshold short square sweeps 3) the subthresholds short square sweep with maximum stimulus amplitude ''' out = organize_sweeps_by_name(sweeps, SHORT_SQUARE) out60 = organize_sweeps_by_name(sweeps, SHORT_SQUARE_60) out80 = organize_sweeps_by_name(sweeps, SHORT_SQUARE_80) out_triple = organize_sweeps_by_name(sweeps, SHORT_SQUARE_TRIPLE) out['all_60'] = out60['all'] out['all_80'] = out80['all'] out['triple'] = out_triple['all'] if len(out['maximum_subthreshold']) == 0: raise MissingSweepException("No maximum subthreshold short square") if len(out['minimum_suprathreshold']) == 0: raise MissingSweepException("No minimum suprathreshold short square") return out
[docs]def find_ramp_sweeps(sweeps): ''' Find 1) all ramp sweeps 2) all subthreshold ramps 3) all superthreshold ramps ''' out = organize_sweeps_by_name(sweeps, RAMP) return out
[docs]def find_noise_sweeps(sweeps): ''' Find 1) the noise1 sweeps 2) the noise2 sweeps 4) all noise sweeps ''' noise1 = organize_sweeps_by_name(sweeps, NOISE1) noise2 = organize_sweeps_by_name(sweeps, NOISE2) all_noise_sweeps = sorted(noise1['all'] + noise2['all']) out = { 'all': all_noise_sweeps, 'noise1': noise1['all'], 'noise2': noise2['all'] } num_noise1_sweeps = len(out['noise1']) num_noise2_sweeps = len(out['noise2']) required_noise1_sweeps = 2 required_noise2_sweeps = 2 if num_noise1_sweeps < required_noise1_sweeps: raise MissingSweepException("not enough noise1 sweeps (%d/%d)" % (num_noise1_sweeps, required_noise1_sweeps)) if num_noise2_sweeps < required_noise2_sweeps: raise MissingSweepException("not enough noise2 sweeps (%d/%d)" % (num_noise2_sweeps, required_noise2_sweeps)) return out
[docs]def find_sweeps(sweep_list): sweep_index = { s['sweep_number']: s for s in sweep_list } data = {} ssq_data = find_short_square_sweeps(sweep_index) data.update(ssq_data) lsq_data = find_long_square_sweeps(sweep_index) data.update(lsq_data) ramp_data = find_ramp_sweeps(sweep_index) data.update(ramp_data) r2r_data = find_ramp_to_rheo_sweeps(sweep_index) data.update(r2r_data) noise_data = find_noise_sweeps(sweep_index) data.update(noise_data) return data, sweep_index
[docs]def parse_arguments(): parser = argparse.ArgumentParser(description='find relevant sweeps from a sweep catalog') parser.add_argument('sweep_list_file', help='json file containing a list of sweeps for a cell') parser.add_argument('output_file', help='output json data config file') args = parser.parse_args() try: if not os.path.exists(args.sweep_list_file): raise Exception("sweep list file (%s) does not exist" % args.sweep_file) except Exception as e: parser.print_help() sys.exit(1) return args
[docs]def main(): args = parse_arguments() sweep_list = ju.read(args.sweep_list_file) data = find_sweeps(sweep_list) ju.write(args.output_file, data) if len(errs > 0): for err in errs: logging.error(err) sys.exit(1)
if __name__ == "__main__": main()