Source code for allensdk.brain_observatory.argschema_utilities

import argparse
import os
import pathlib

import marshmallow
from argschema import ArgSchemaParser
from argschema.schemas import DefaultSchema
from marshmallow import RAISE, ValidationError


[docs]class InputFile(marshmallow.fields.String): """A marshmallow String field subclass which deserializes json str fields that represent a desired input path to pathlib.Path. Also performs read access checking. """ def _deserialize(self, value, attr, obj, **kwargs) -> pathlib.Path: return pathlib.Path(value) def _serialize(self, value, attr, obj, **kwargs) -> str: return str(value) def _validate(self, value: pathlib.Path): check_read_access(str(value))
[docs]class OutputFile(marshmallow.fields.String): """A marshmallow String field subclass which deserializes json str fields that represent a desired output file path to a pathlib.Path. Also performs write access checking. """ def _deserialize(self, value, attr, obj, **kwargs) -> pathlib.Path: return pathlib.Path(value) def _serialize(self, value, attr, obj, **kwargs) -> str: return str(value) def _validate(self, value: pathlib.Path): check_write_access_overwrite(str(value))
[docs]def write_or_print_outputs(data, parser): data.update({'input_parameters': parser.args}) if 'output_json' in parser.args: parser.output(data, indent=2) else: print(parser.get_output_json(data))
[docs]def check_write_access_dir(dirpath): if os.path.exists(dirpath): test_filepath = pathlib.Path(dirpath, 'test_file.txt') try: with test_filepath.open() as _: pass os.remove(test_filepath) return True except PermissionError: raise ValidationError( f'don\'t have permissions to write in directory {dirpath}') else: try: pathlib.Path(dirpath).mkdir(parents=True) pathlib.Path(dirpath).rmdir() return True except PermissionError: raise ValidationError( f'Can\'t build path to requested location {dirpath}') raise RuntimeError('Unhandled case; this should not happen')
[docs]def check_write_access(filepath, allow_exists=False): try: fd = os.open(filepath, os.O_CREAT | os.O_EXCL) os.close(fd) os.remove(filepath) return True except FileExistsError: if not allow_exists: raise ValidationError(f'file at {filepath} already exists') else: return True except (FileNotFoundError, PermissionError): base_dir = os.path.dirname(filepath) return check_write_access_dir(base_dir) except Exception as e: raise e raise RuntimeError('Unhandled case; this should not happen')
[docs]def check_write_access_overwrite(path): return check_write_access(path, allow_exists=True)
[docs]def check_read_access(path): try: f = open(path, mode='r') f.close() return True except Exception as err: raise ValidationError( f'file at #{path} not readable (#{type(err)}: {err}')
[docs]class RaisingSchema(DefaultSchema):
[docs] class Meta: unknown = RAISE
[docs]class ArgSchemaParserPlus(ArgSchemaParser): # pragma: no cover def __init__(self, *args, **kwargs): parser = argparse.ArgumentParser() [known_args, extra_args] = parser.parse_known_args() self.args = known_args super(ArgSchemaParserPlus, self).__init__(args=extra_args, **kwargs)
[docs]def optional_lims_inputs(argv, input_schema, output_schema, lims_input_getter): remaining_args = argv[1:] input_data = {} if "--get_inputs_from_lims" in argv: lims_parser = argparse.ArgumentParser(add_help=False) lims_parser.add_argument("--host", type=str, default="http://lims2") lims_parser.add_argument("--job_queue", type=str, default=None) lims_parser.add_argument("--strategy", type=str, default=None) lims_parser.add_argument("--ecephys_session_id", type=int, default=None) lims_parser.add_argument("--output_root", type=str, default=None) lims_args, remaining_args = lims_parser.parse_known_args( remaining_args) remaining_args = [ item for item in remaining_args if item != "--get_inputs_from_lims" ] input_data = lims_input_getter(**lims_args.__dict__) try: parser = ArgSchemaParser( args=remaining_args, input_data=input_data, schema_type=input_schema, output_schema_type=output_schema, ) except ValidationError: print(input_data) raise return parser