Source code for allensdk.internal.core.lims_pipeline_module
import logging
import argparse
import subprocess
import os
import errno
import allensdk.core.json_utilities as ju
from allensdk.config.manifest import Manifest
SHARED_PYTHON = "/shared/utils.x86_64/python-2.7/bin/python"
SHARED_SDK = "/shared/bioapps/infoapps/lims2_modules/lib/allensdk"
RUN_PYTHON = "/shared/bioapps/infoapps/lims2_modules/lib/python/run_python.sh"
[docs]class PipelineModule( object ):
def __init__(self, description="", parser=None):
if parser is None:
self.parser = default_argument_parser(description)
else:
self.parser = parser
self._args = None
@property
def args(self):
if self._args is None:
self._args = self.parser.parse_args()
logging.basicConfig(level=self.args.log_level, format="%(asctime)s:%(levelname)s:%(message)s")
return self._args
[docs] def write_output_data(self, data):
try:
ju.write(self.args.output_json, data)
except Exception as e:
logging.error("could not write output json: %s", self.args.output_json)
raise e
[docs]def default_argument_parser(description=""):
parser = argparse.ArgumentParser(description)
parser.add_argument('input_json')
parser.add_argument('output_json')
parser.add_argument('--log-level', default=logging.DEBUG)
return parser
[docs]def run_module(module, input_data, storage_directory,
optional_args=None,
python=SHARED_PYTHON,
sdk_path=SHARED_SDK,
local=False,
pbs=None):
PBS_TEMPLATE="""
export PYTHONPATH=%(sdk_path)s:$PYTHONPATH
PYTHON=%(python)s
SCRIPT="%(module)s"
$PYTHON $SCRIPT %(optional_args)s %(input_json)s %(output_json)s
"""
if optional_args is None:
optional_args = []
input_json = os.path.join(storage_directory, "input.json")
output_json = os.path.join(storage_directory, "output.json")
pbs_file = os.path.join(storage_directory, "run.pbs")
Manifest.safe_mkdir(storage_directory)
pbs_headers = [ ('-j oe'),
('-o %s' % os.path.join(storage_directory, "run.log")) ]
pbs = pbs if pbs is not None else {}
queue = pbs.get('queue', 'braintv')
pbs_headers.append('-q %s' % queue)
walltime = pbs.get('walltime', '3:00:00')
pbs_headers.append('-l walltime=%s' % walltime)
vmem = pbs.get('vmem', 16)
pbs_headers.append('-l vmem=%dgb' % vmem)
if 'job_name' in pbs:
pbs_headers.append('-N %s' % pbs['job_name'])
if 'ncpus' in pbs:
pbs_headers.append('-l ncpus=%d' % pbs['ncpus'])
pbs_headers = [ '#PBS %s' % s for s in pbs_headers ]
with open(pbs_file,"w") as f:
f.write('\n'.join(pbs_headers) + PBS_TEMPLATE % {
"python": python,
"sdk_path": sdk_path,
"module": module,
"input_json": input_json,
"output_json": output_json,
"optional_args": " ".join(optional_args)
})
ju.write(input_json, input_data)
if local:
subprocess.call(['sh', pbs_file])
else:
subprocess.call(['qsub', pbs_file])