Source code for allensdk.internal.core.lims_pipeline_module

import logging
import argparse
import subprocess
import os
import errno

import allensdk.core.json_utilities as ju
from allensdk.config.manifest import Manifest

SHARED_PYTHON = "/shared/utils.x86_64/python-2.7/bin/python"
SHARED_SDK = "/shared/bioapps/infoapps/lims2_modules/lib/allensdk"
RUN_PYTHON = "/shared/bioapps/infoapps/lims2_modules/lib/python/run_python.sh"

[docs]class PipelineModule( object ): def __init__(self, description="", parser=None): if parser is None: self.parser = default_argument_parser(description) else: self.parser = parser self._args = None @property def args(self): if self._args is None: self._args = self.parser.parse_args() logging.basicConfig(level=self.args.log_level, format="%(asctime)s:%(levelname)s:%(message)s") return self._args
[docs] def input_data(self): try: return ju.read(self.args.input_json) except Exception as e: logging.error("could not read input json: %s", self.args.input_json) raise e
[docs] def write_output_data(self, data): try: ju.write(self.args.output_json, data) except Exception as e: logging.error("could not write output json: %s", self.args.output_json) raise e
[docs]def default_argument_parser(description=""): parser = argparse.ArgumentParser(description) parser.add_argument('input_json') parser.add_argument('output_json') parser.add_argument('--log-level', default=logging.DEBUG) return parser
[docs]def run_module(module, input_data, storage_directory, optional_args=None, python=SHARED_PYTHON, sdk_path=SHARED_SDK, local=False, pbs=None): PBS_TEMPLATE=""" export PYTHONPATH=%(sdk_path)s:$PYTHONPATH PYTHON=%(python)s SCRIPT="%(module)s" $PYTHON $SCRIPT %(optional_args)s %(input_json)s %(output_json)s """ if optional_args is None: optional_args = [] input_json = os.path.join(storage_directory, "input.json") output_json = os.path.join(storage_directory, "output.json") pbs_file = os.path.join(storage_directory, "run.pbs") Manifest.safe_mkdir(storage_directory) pbs_headers = [ ('-j oe'), ('-o %s' % os.path.join(storage_directory, "run.log")) ] pbs = pbs if pbs is not None else {} queue = pbs.get('queue', 'braintv') pbs_headers.append('-q %s' % queue) walltime = pbs.get('walltime', '3:00:00') pbs_headers.append('-l walltime=%s' % walltime) vmem = pbs.get('vmem', 16) pbs_headers.append('-l vmem=%dgb' % vmem) if 'job_name' in pbs: pbs_headers.append('-N %s' % pbs['job_name']) if 'ncpus' in pbs: pbs_headers.append('-l ncpus=%d' % pbs['ncpus']) pbs_headers = [ '#PBS %s' % s for s in pbs_headers ] with open(pbs_file,"w") as f: f.write('\n'.join(pbs_headers) + PBS_TEMPLATE % { "python": python, "sdk_path": sdk_path, "module": module, "input_json": input_json, "output_json": output_json, "optional_args": " ".join(optional_args) }) ju.write(input_json, input_data) if local: subprocess.call(['sh', pbs_file]) else: subprocess.call(['qsub', pbs_file])