import os.path
import numpy as np
import json, logging
from allensdk.core.nwb_data_set import NwbDataSet
import allensdk.core.json_utilities as ju
from allensdk.model.biophys_sim.config import Config
from allensdk.internal.model.biophysical import ephys_utils
from allensdk.internal.model.biophysical.deap_utils import Utils
[docs]class Report:
_log = logging.getLogger('allensdk.model.biophysical.make_deap_fit_json')
def __init__(self,
top_level_description,
fit_type):
self.utils = None
self.top_level_description = top_level_description
self.description = None
self.manifest = None
self.specimen_id = str(self.top_level_description.data['runs'][0]['specimen_id'])
self.fit_type = fit_type
self.target_path = self.top_level_description.manifest.get_path('target_path')
self.target = ju.read(self.target_path)
self.seeds = [1234, 1001, 4321, 1024, 2048]
self.org_selections = [0, 100, 500, 1000] # Picks thek best, 100th best, etc. organisms as examples
self.trace_colors = ["#1b9e77", "#d95f02", "#7570b3", "#e7298a"]
self.config_path = self.top_level_description.manifest.get_path('fit_config_json',
self.fit_type)
self.fit_config = Config().load(self.config_path)
fit_style_path = self.fit_config.data["biophys"][0]["model_file"][-1]
self.fit_style_info = ju.read(fit_style_path)
self.used_features = self.fit_style_info["features"]
self.all_params = self.fit_style_info["channels"] + self.fit_style_info["addl_params"]
nwb_path = self.top_level_description.manifest.get_path('stimulus_path')
self.data_set = NwbDataSet(nwb_path)
self.neuronal_model_data = ju.read(self.top_level_description.manifest.get_path('neuronal_model_data'))
self.specimen_data = self.neuronal_model_data['specimen']
self.all_sweeps = self.specimen_data['ephys_sweeps']
[docs] def best_fit_value(self):
return self.all_hof_fit_errors[self.sorted_indexes[self.org_selections[0]]]
[docs] def generate_fit_file(self):
self.gather_from_seeds()
self.setup_model()
self.check_org_selections_for_noise_block()
self.make_fit_json_file()
[docs] def make_fit_json_file(self):
json_data = {}
# passive
json_data["passive"] = [{}]
json_data["passive"][0]["ra"] = self.target["passive"][0]["ra"]
json_data["passive"][0]["e_pas"] = self.target["passive"][0]["e_pas"]
json_data["passive"][0]["cm"] = []
for k in self.target["passive"][0]["cm"]:
json_data["passive"][0]["cm"].append({"section": k, "cm": self.target["passive"][0]["cm"][k]})
# fitting
json_data["fitting"] = [{}]
json_data["fitting"][0]["sweeps"] = self.target["fitting"][0]["sweeps"]
json_data["fitting"][0]["junction_potential"] = self.target["fitting"][0]["junction_potential"]
# conditions
json_data["conditions"] = self.fit_style_info["conditions"]
json_data["conditions"][0]["v_init"] = self.target["passive"][0]["e_pas"]
# genome
json_data["genome"] = []
genome_vals = self.all_hof_fits[self.sorted_indexes[self.org_selections[0]], :]
for i, p in enumerate(self.all_params):
if len(p["mechanism"]) > 0:
param_name = p["parameter"] + "_" + p["mechanism"]
else:
param_name = p["parameter"]
json_data["genome"].append({"value": genome_vals[i],
"section": p["section"],
"name": param_name,
"mechanism": p["mechanism"]
})
# write out file
with open(self.top_level_description.manifest.get_path('output_fit_file',
self.specimen_id,
self.fit_type), "w") as f:
json.dump(json_data, f, indent=2)
[docs] def setup_model(self):
morphology_path = os.path.realpath(self.top_level_description.manifest.get_path('MORPHOLOGY'))
cwd = os.path.realpath(os.curdir)
self.utils = Utils(self.fit_config)
h = self.utils.h
self.utils.generate_morphology(morphology_path)
self.utils.load_cell_parameters()
self.utils.insert_iclamp()
self.stim_params = self.fit_config.data["stimulus"][0]
self.utils.set_iclamp_params(self.stim_params["amplitude"],
self.stim_params["delay"],
self.stim_params["duration"])
h.tstop = self.stim_params["delay"] * 2.0 + self.stim_params["duration"]
h.cvode_active(1)
h.cvode.atolscale("cai", 1e-4)
h.cvode.maxstep(10)
[docs] def gather_from_seeds(self):
first_created = False
for s in self.seeds:
final_hof_fit_path = \
self.top_level_description.manifest.get_path('final_hof_fit',
self.fit_type,
s)
final_hof_path = \
self.top_level_description.manifest.get_path('final_hof',
self.fit_type,
s)
if not os.path.exists(final_hof_fit_path):
Report._log.warn("Could not find output file %s for seed %d" % (final_hof_fit_path, s))
continue
hof_fit_errors = np.loadtxt(final_hof_fit_path)
hof_fits = np.loadtxt(final_hof_path)
if not first_created:
all_hof_fit_errors = hof_fit_errors.copy()
all_hof_fits = hof_fits.copy()
first_created = True
else:
all_hof_fit_errors = np.hstack([all_hof_fit_errors, hof_fit_errors])
all_hof_fits = np.vstack([all_hof_fits, hof_fits])
self.all_hof_fits = all_hof_fits
self.all_hof_fit_errors = all_hof_fit_errors
self.sorted_indexes = np.argsort(self.all_hof_fit_errors)
[docs] def check_org_selections_for_noise_block(self):
h = self.utils.h
v_vec, i_vec, t_vec = self.utils.record_values()
depol_block_threshold = -50.0 # mV
block_min_duration = 50.0 # ms
h.cvode_active(0)
noise_i_stim = []
for sweep_type in ["C1NSSEED_1", "C1NSSEED_2"]:
sweeps, sweep_numbers, statuses = ephys_utils.get_sweeps_of_type(sweep_type, self.all_sweeps)
_, expt_i, expt_t = ephys_utils.get_sweep_v_i_t_from_set(self.data_set, sweep_numbers[0])
noise_i_stim.append(expt_i)
dt = (expt_t[1] - expt_t[0]) * 1e3
h.dt = dt
h.tstop = expt_t[-1] * 1e3
self.utils.stim.dur = 1e12
for ii, org_ind in enumerate(self.sorted_indexes):
Report._log.debug("Testing org %s%s" % (ii, org_ind))
self.utils.set_actual_parameters(self.all_hof_fits[org_ind, :])
depol_okay = True
use_ii = -1
for expt_i in noise_i_stim:
Report._log.debug("Running some noise")
i_stim_vec = h.Vector(expt_i * 1e-3)
i_stim_vec.play(self.utils.stim._ref_amp, dt)
h.finitialize()
h.run()
i_stim_vec.play_remove()
v = v_vec.as_numpy()
t = t_vec.as_numpy()
i = i_vec.as_numpy()
stim_start_idx = 0
stim_end_idx = len(t) - 1
bool_v = np.array(v > depol_block_threshold, dtype=int)
up_indexes = np.flatnonzero(np.diff(bool_v) == 1)
down_indexes = np.flatnonzero(np.diff(bool_v) == -1)
if len(up_indexes) > len(down_indexes):
down_indexes = np.append(down_indexes, [stim_end_idx])
if len(up_indexes) != 0:
max_depol_duration = np.max([t[down_indexes[k]] - t[up_idx] for k, up_idx in enumerate(up_indexes)])
if max_depol_duration > block_min_duration:
Report._log.debug("Encountered depolarization block")
depol_okay = False
break
if depol_okay:
Report._log.debug("Did not detect depolarization block on noise traces")
use_ii = ii
break
h.cvode_active(1)
self.utils.set_iclamp_params(self.stim_params["amplitude"], self.stim_params["delay"],
self.stim_params["duration"])
self.utils.h.tstop = self.stim_params["delay"] * 2.0 + self.stim_params["duration"]
if use_ii == -1:
Report._log.debug("Could not find an organism without depolarization block on noise.")
else:
self.org_selections = [o + use_ii for o in self.org_selections]