Source code for allensdk.internal.pipeline_modules.run_observatory_analysis

#!/usr/bin/python
# Copyright 2016 Allen Institute for Brain Science
# This file is part of Allen SDK.
#
# Allen SDK is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Allen SDK is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Allen SDK.  If not, see <http://www.gnu.org/licenses/>.

import json, sys, traceback, logging
from allensdk.brain_observatory.session_analysis import run_session_analysis
import allensdk.brain_observatory.stimulus_info as si
import allensdk.core.json_utilities as json_util
from allensdk.internal.core.lims_pipeline_module import PipelineModule, run_module, SHARED_PYTHON
import allensdk.internal.core.lims_utilities as lu
from six import iteritems
import os
import logging

[docs]def get_experiment_nwb_file(experiment_id): res = lu.query(""" select * from well_known_files wkf join well_known_file_types wkft on wkft.id = wkf.well_known_file_type_id where attachable_id = %d and wkft.name = 'NWBOphys' """ % experiment_id) return os.path.join(res[0]['storage_directory'], res[0]['filename'])
[docs]def get_experiment_session(experiment_id): return lu.query(""" select stimulus_name from ophys_sessions os join ophys_experiments oe on oe.ophys_session_id = os.id where oe.id = %d """ % experiment_id)[0]['stimulus_name']
[docs]def debug(experiment_ids, local=False, OUTPUT_DIR = "/data/informatics/CAM/analysis/", SDK_PATH = "/data/informatics/CAM/analysis/allensdk/", walltime="10:00:00", python=SHARED_PYTHON, queue='braintv'): input_data = {} for eid in experiment_ids: exp_dir = os.path.join(OUTPUT_DIR, str(eid)) input_data[eid] = dict(nwb_file=get_experiment_nwb_file(eid), output_file=os.path.join(exp_dir, "%d_analysis.h5" % eid), session_name=get_experiment_session(eid)) run_module(os.path.abspath(__file__), input_data, exp_dir, python=python, sdk_path=SDK_PATH, pbs=dict(vmem=32, job_name="bobanalysis_%d"% eid, walltime=walltime, queue=queue), local=local)
[docs]def main(): mod = PipelineModule() jin = mod.input_data() results = {} for ident, experiment in iteritems(jin): nwb_file = experiment['nwb_file'] output_file = experiment['output_file'] if experiment["session_name"] not in si.SESSION_STIMULUS_MAP.keys(): raise Exception("Could not run analysis for unknown session: %s" % experiment["session_name"]) logging.info("Running %s analysis", experiment["session_name"]) logging.info("NWB file %s", nwb_file) logging.info("Output file %s", output_file) results[ident] = run_session_analysis(nwb_file, output_file, save_flag=True, plot_flag=False) logging.info("Generating output") jout = {} for session_name, data in results.items(): # results for this session res = {} # metric fields names = {} roi_id = None for metric, values in data['cell'].items(): if metric == "roi_id": roi_id = values else: # convert dict to array vals = [] for i in range(len(values)): vals.append(values[i]) # panda syntax names[metric] = vals # make an output record for each roi_id if roi_id is not None: for i in range(len(roi_id)): name = roi_id[i] roi = {} for field, values in names.items(): roi[field] = values[i] res[name] = roi jout[session_name] = { 'cell': res, 'experiment': data['experiment'] } logging.info("Saving output") mod.write_output_data(jout)
if __name__ == "__main__": main()