Source code for allensdk.internal.pipeline_modules.run_observatory_analysis
#!/usr/bin/python
# Copyright 2016 Allen Institute for Brain Science
# This file is part of Allen SDK.
#
# Allen SDK is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Allen SDK is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Allen SDK. If not, see <http://www.gnu.org/licenses/>.
import json, sys, traceback, logging
from allensdk.brain_observatory.session_analysis import run_session_analysis
import allensdk.brain_observatory.stimulus_info as si
import allensdk.core.json_utilities as json_util
from allensdk.internal.core.lims_pipeline_module import PipelineModule, run_module, SHARED_PYTHON
import allensdk.internal.core.lims_utilities as lu
from six import iteritems
import os
import logging
[docs]def get_experiment_nwb_file(experiment_id):
res = lu.query("""
select * from well_known_files wkf
join well_known_file_types wkft on wkft.id = wkf.well_known_file_type_id
where attachable_id = %d
and wkft.name = 'NWBOphys'
""" % experiment_id)
return os.path.join(res[0]['storage_directory'], res[0]['filename'])
[docs]def get_experiment_session(experiment_id):
return lu.query("""
select stimulus_name from ophys_sessions os
join ophys_experiments oe on oe.ophys_session_id = os.id
where oe.id = %d
""" % experiment_id)[0]['stimulus_name']
[docs]def debug(experiment_ids, local=False,
OUTPUT_DIR = "/data/informatics/CAM/analysis/",
SDK_PATH = "/data/informatics/CAM/analysis/allensdk/",
walltime="10:00:00",
python=SHARED_PYTHON,
queue='braintv'):
input_data = {}
for eid in experiment_ids:
exp_dir = os.path.join(OUTPUT_DIR, str(eid))
input_data[eid] = dict(nwb_file=get_experiment_nwb_file(eid),
output_file=os.path.join(exp_dir, "%d_analysis.h5" % eid),
session_name=get_experiment_session(eid))
run_module(os.path.abspath(__file__),
input_data,
exp_dir,
python=python,
sdk_path=SDK_PATH,
pbs=dict(vmem=32,
job_name="bobanalysis_%d"% eid,
walltime=walltime,
queue=queue),
local=local)
[docs]def main():
mod = PipelineModule()
jin = mod.input_data()
results = {}
for ident, experiment in iteritems(jin):
nwb_file = experiment['nwb_file']
output_file = experiment['output_file']
if experiment["session_name"] not in si.SESSION_STIMULUS_MAP.keys():
raise Exception("Could not run analysis for unknown session: %s" % experiment["session_name"])
logging.info("Running %s analysis", experiment["session_name"])
logging.info("NWB file %s", nwb_file)
logging.info("Output file %s", output_file)
results[ident] = run_session_analysis(nwb_file, output_file,
save_flag=True, plot_flag=False)
logging.info("Generating output")
jout = {}
for session_name, data in results.items():
# results for this session
res = {}
# metric fields
names = {}
roi_id = None
for metric, values in data['cell'].items():
if metric == "roi_id":
roi_id = values
else:
# convert dict to array
vals = []
for i in range(len(values)):
vals.append(values[i]) # panda syntax
names[metric] = vals
# make an output record for each roi_id
if roi_id is not None:
for i in range(len(roi_id)):
name = roi_id[i]
roi = {}
for field, values in names.items():
roi[field] = values[i]
res[name] = roi
jout[session_name] = {
'cell': res,
'experiment': data['experiment']
}
logging.info("Saving output")
mod.write_output_data(jout)
if __name__ == "__main__": main()