Source code for allensdk.internal.model.glif.find_sweeps
import json, sys, os
import logging
import argparse
from six import iteritems
from six.moves import xrange
import allensdk.core.json_utilities as ju
SHORT_SQUARE = 'Short Square'
SHORT_SQUARE_60 = 'Short Square - Hold -60mv'
SHORT_SQUARE_80 = 'Short Square - Hold -80mv'
LONG_SQUARE = 'Long Square'
RAMP = 'Ramp'
NOISE1 = 'Noise 1'
NOISE2 = 'Noise 2'
SHORT_SQUARE_TRIPLE = 'Short Square - Triple'
RAMP_TO_RHEO = 'Ramp to Rheobase'
[docs]class MissingSweepException( Exception ): pass
[docs]def get_sweep_numbers(sweep_list):
return [ s['sweep_number'] for s in sweep_list]
[docs]def get_sweeps_by_name(sweeps, sweep_type):
if isinstance(sweeps, dict):
return [ s for sn,s in iteritems(sweeps) if s[u'ephys_stimulus'][u'ephys_stimulus_type'][u'name'] == sweep_type ]
else:
return [ s for s in sweeps if s[u'ephys_stimulus'][u'ephys_stimulus_type'][u'name'] == sweep_type ]
[docs]def find_ranked_sweep(sweep_list, key, reverse=False):
if sweep_list:
sorted_sweep_list = sorted(sweep_list, key=lambda x: x[key], reverse=reverse)
out_sweeps = [ sorted_sweep_list[0] ]
for i in xrange(1,len(sweep_list)):
if sorted_sweep_list[i][key] == out_sweeps[0][key]:
out_sweeps.append(sorted_sweep_list[i])
else:
break
return get_sweep_numbers(out_sweeps)
else:
return []
[docs]def organize_sweeps_by_name(sweeps, name):
sweep_list = sorted(get_sweeps_by_name(sweeps, name), key=lambda x: x['sweep_number'])
subthreshold_list = [ s for s in sweep_list if s.get('num_spikes',None) in [0, None] ]
suprathreshold_list = [ s for s in sweep_list if s.get('num_spikes',None) > 0 ]
return {
'all': get_sweep_numbers(sweep_list),
'subthreshold': get_sweep_numbers(subthreshold_list),
'suprathreshold': get_sweep_numbers(suprathreshold_list),
'maximum_subthreshold': find_ranked_sweep(subthreshold_list, 'stimulus_amplitude', reverse=True),
'minimum_suprathreshold': find_ranked_sweep(suprathreshold_list, 'stimulus_amplitude')
#'maximum_subthreshold': find_ranked_sweep(subthreshold_list, 'stimulus_absolute_amplitude', reverse=True),
#'minimum_suprathreshold': find_ranked_sweep(suprathreshold_list, 'stimulus_absolute_amplitude')
}
[docs]def find_long_square_sweeps(sweeps):
out = organize_sweeps_by_name(sweeps, LONG_SQUARE)
return out
[docs]def find_ramp_to_rheo_sweeps(sweeps):
out = organize_sweeps_by_name(sweeps, RAMP_TO_RHEO)
return out
[docs]def find_short_square_sweeps(sweeps):
'''
Find 1) all of the subthreshold short square sweeps
2) all of the superthreshold short square sweeps
3) the subthresholds short square sweep with maximum stimulus amplitude
'''
out = organize_sweeps_by_name(sweeps, SHORT_SQUARE)
out60 = organize_sweeps_by_name(sweeps, SHORT_SQUARE_60)
out80 = organize_sweeps_by_name(sweeps, SHORT_SQUARE_80)
out_triple = organize_sweeps_by_name(sweeps, SHORT_SQUARE_TRIPLE)
out['all_60'] = out60['all']
out['all_80'] = out80['all']
out['triple'] = out_triple['all']
if len(out['maximum_subthreshold']) == 0:
raise MissingSweepException("No maximum subthreshold short square")
if len(out['minimum_suprathreshold']) == 0:
raise MissingSweepException("No minimum suprathreshold short square")
return out
[docs]def find_ramp_sweeps(sweeps):
'''
Find 1) all ramp sweeps
2) all subthreshold ramps
3) all superthreshold ramps
'''
out = organize_sweeps_by_name(sweeps, RAMP)
return out
[docs]def find_noise_sweeps(sweeps):
'''
Find 1) the noise1 sweeps
2) the noise2 sweeps
4) all noise sweeps
'''
noise1 = organize_sweeps_by_name(sweeps, NOISE1)
noise2 = organize_sweeps_by_name(sweeps, NOISE2)
all_noise_sweeps = sorted(noise1['all'] + noise2['all'])
out = {
'all': all_noise_sweeps,
'noise1': noise1['all'],
'noise2': noise2['all']
}
num_noise1_sweeps = len(out['noise1'])
num_noise2_sweeps = len(out['noise2'])
required_noise1_sweeps = 2
required_noise2_sweeps = 2
if num_noise1_sweeps < required_noise1_sweeps:
raise MissingSweepException("not enough noise1 sweeps (%d/%d)" % (num_noise1_sweeps, required_noise1_sweeps))
if num_noise2_sweeps < required_noise2_sweeps:
raise MissingSweepException("not enough noise2 sweeps (%d/%d)" % (num_noise2_sweeps, required_noise2_sweeps))
return out
[docs]def find_sweeps(sweep_list):
sweep_index = { s['sweep_number']: s for s in sweep_list }
data = {}
ssq_data = find_short_square_sweeps(sweep_index)
data.update(ssq_data)
lsq_data = find_long_square_sweeps(sweep_index)
data.update(lsq_data)
ramp_data = find_ramp_sweeps(sweep_index)
data.update(ramp_data)
r2r_data = find_ramp_to_rheo_sweeps(sweep_index)
data.update(r2r_data)
noise_data = find_noise_sweeps(sweep_index)
data.update(noise_data)
return data, sweep_index
[docs]def parse_arguments():
parser = argparse.ArgumentParser(description='find relevant sweeps from a sweep catalog')
parser.add_argument('sweep_list_file', help='json file containing a list of sweeps for a cell')
parser.add_argument('output_file', help='output json data config file')
args = parser.parse_args()
try:
if not os.path.exists(args.sweep_list_file):
raise Exception("sweep list file (%s) does not exist" % args.sweep_file)
except Exception as e:
parser.print_help()
sys.exit(1)
return args
[docs]def main():
args = parse_arguments()
sweep_list = ju.read(args.sweep_list_file)
data = find_sweeps(sweep_list)
ju.write(args.output_file, data)
if len(errs > 0):
for err in errs:
logging.error(err)
sys.exit(1)
if __name__ == "__main__": main()