Source code for allensdk.brain_observatory.vbn_2022.metadata_writer.schemas
import argschema
from argschema.schemas import DefaultSchema
import pathlib
from marshmallow import post_load
from marshmallow.validate import OneOf
from allensdk.brain_observatory.vbn_2022.utils.schemas import (
ProbeToSkip)
[docs]class VBN2022MetadataWriterInputSchema(argschema.ArgSchema):
ecephys_session_id_list = argschema.fields.List(
argschema.fields.Int,
required=True,
description=("List of ecephys_sessions.id values "
"of sessions to be released"))
failed_ecephys_session_id_list = argschema.fields.List(
argschema.fields.Int,
required=False,
default=None,
allow_none=True,
description=("List of ecephys_sessions.id values "
"associated with this release that were "
"failed. These are required to "
"self-consistently construct the history of "
"each mouse passing through the apparatus."))
probes_to_skip = argschema.fields.List(
argschema.fields.Nested(ProbeToSkip),
required=False,
default=None,
allow_none=True,
description=("List of probes to skip"))
output_dir = argschema.fields.OutputDir(
required=True,
description=("Directory where outputs will be written"))
clobber = argschema.fields.Boolean(
default=False,
description=("If false, throw an error if output files "
"already exist"))
ecephys_nwb_dir = argschema.fields.InputDir(
required=True,
allow_none=False,
description=("The directory where ecephys_nwb sessions are "
"to be found"))
ecephys_nwb_prefix = argschema.fields.Str(
required=False,
default='ecephys_session',
description=(
"Ecephys session NWB files will be looked for "
"in the form "
"{ecephys_nwb_dir}/{ecephys_nwb_prefix}_{ecephys_session_id}.nwb")
)
supplemental_data = argschema.fields.List(
argschema.fields.Dict,
default=None,
allow_none=True,
description=(
"List of dicts definining any supplemental columns "
"that need to be added to the ecephys_sessions.csv "
"table. Each dict should represent a row in a dataframe "
"that will get merged on ecephys_session_id with "
"the ecephys_sessions table (row must therefore contain "
"ecephys_session_id)"))
on_missing_file = argschema.fields.Str(
default='error',
required=False,
validation=OneOf(('error', 'warn', 'skip')),
description=("What to do if an input datafile is missing. "
"If 'error', raise an exception. "
"If 'warn', assign a dummy ID and issue a warning. "
"If 'skip', do not list in metadata and issue a "
"warning (note, any sessions thus skipped will still "
"show up in aggregate metadata; there just will "
"be no line for those sessions in tables that list "
"data files for release, like sessions.csv)."))
[docs] @post_load
def validate_paths(self, data, **kwargs):
fname_lookup = {'units_path': 'units.csv',
'channels_path': 'channels.csv',
'probes_path': 'probes.csv',
'ecephys_sessions_path': 'ecephys_sessions.csv',
'behavior_sessions_path': 'behavior_sessions.csv'}
out_dir = pathlib.Path(data['output_dir'])
msg = ""
for fname_k in fname_lookup.keys():
full_path = out_dir / fname_lookup[fname_k]
if full_path.exists() and not data['clobber']:
msg += f"{full_path.resolve().absolute()}\n"
data[fname_k] = str(full_path.resolve().absolute())
if len(msg) > 0:
raise RuntimeError(
"The following files already exist\n"
f"{msg}"
"Run with clobber=True if you want to overwrite")
return data
[docs]class PipelineMetadataSchema(DefaultSchema):
name = argschema.fields.Str(
required=True,
allow_none=False,
description=(
"Name of the pipeline component (e.g. 'AllenSDK')"))
version = argschema.fields.Str(
required=True,
allow_none=False,
description=(
"Semantic version of the pipeline component"))
comment = argschema.fields.Str(
required=False,
default="",
description=(
"Optional comment about this piece of software"))
[docs]class DataReleaseToolsInputSchema(argschema.ArgSchema):
"""
This schema will be used as the output schema for
data_release.metadata_writer modules. It is actually
a subset of the input schema for the
informatics_data_release_tools (the output of the metadata
writers is meant to be the input of the data_release_tool)
"""
metadata_files = argschema.fields.List(
argschema.fields.InputFile,
description=(
"Paths to the metadata .csv files "
"written by this modules"))
data_pipeline_metadata = argschema.fields.Nested(
PipelineMetadataSchema,
many=True,
description=(
"Metadata about the pipeline used "
"to create this data release"))
project_name = argschema.fields.Str(
required=True,
default=None,
allow_none=False,
description=(
"The project name to be passed along "
"to the data_release_tool when uploading "
"this dataset"))