import argparse
import os
import pathlib
import marshmallow
from argschema import ArgSchemaParser
from argschema.schemas import DefaultSchema
from marshmallow import RAISE, ValidationError
[docs]class OutputFile(marshmallow.fields.String):
"""A marshmallow String field subclass which deserializes json str fields
that represent a desired output file path to a pathlib.Path.
Also performs write access checking.
"""
def _deserialize(self, value, attr, obj, **kwargs) -> pathlib.Path:
return pathlib.Path(value)
def _serialize(self, value, attr, obj, **kwargs) -> str:
return str(value)
def _validate(self, value: pathlib.Path):
check_write_access_overwrite(str(value))
[docs]def write_or_print_outputs(data, parser):
data.update({'input_parameters': parser.args})
if 'output_json' in parser.args:
parser.output(data, indent=2)
else:
print(parser.get_output_json(data))
[docs]def check_write_access_dir(dirpath):
if os.path.exists(dirpath):
test_filepath = pathlib.Path(dirpath, 'test_file.txt')
try:
with test_filepath.open() as _:
pass
os.remove(test_filepath)
return True
except PermissionError:
raise ValidationError(
f'don\'t have permissions to write in directory {dirpath}')
else:
try:
pathlib.Path(dirpath).mkdir(parents=True)
pathlib.Path(dirpath).rmdir()
return True
except PermissionError:
raise ValidationError(
f'Can\'t build path to requested location {dirpath}')
raise RuntimeError('Unhandled case; this should not happen')
[docs]def check_write_access(filepath, allow_exists=False):
try:
fd = os.open(filepath, os.O_CREAT | os.O_EXCL)
os.close(fd)
os.remove(filepath)
return True
except FileExistsError:
if not allow_exists:
raise ValidationError(f'file at {filepath} already exists')
else:
return True
except (FileNotFoundError, PermissionError):
base_dir = os.path.dirname(filepath)
return check_write_access_dir(base_dir)
except Exception as e:
raise e
raise RuntimeError('Unhandled case; this should not happen')
[docs]def check_write_access_overwrite(path):
return check_write_access(path, allow_exists=True)
[docs]def check_read_access(path):
try:
f = open(path, mode='r')
f.close()
return True
except Exception as err:
raise ValidationError(
f'file at #{path} not readable (#{type(err)}: {err}')
[docs]class RaisingSchema(DefaultSchema):
[docs]class ArgSchemaParserPlus(ArgSchemaParser): # pragma: no cover
def __init__(self, *args, **kwargs):
parser = argparse.ArgumentParser()
[known_args, extra_args] = parser.parse_known_args()
self.args = known_args
super(ArgSchemaParserPlus, self).__init__(args=extra_args, **kwargs)