Source code for allensdk.internal.pipeline_modules.run_ophys_session_decomposition
import logging
from allensdk.internal.core.lims_pipeline_module import (PipelineModule,
run_module)
import allensdk.core.json_utilities as ju
import allensdk.internal.core.lims_utilities as lu
from allensdk.internal.brain_observatory import ophys_session_decomposition as osd
from multiprocessing import Pool
import os
DEBUG_CHANNELS = ["data", "piezo"]
DEBUG_WIDTH = 512
DEBUG_HEIGHT = 256
DEBUG_ITEMSIZE = 2
DEBUG_N_PLANES = 6
[docs]def create_fake_metadata(exp_dir, raw_path, channels=None,
width=DEBUG_WIDTH, height=DEBUG_HEIGHT,
itemsize=DEBUG_ITEMSIZE, n_planes=DEBUG_N_PLANES):
metadata = []
size = os.stat(raw_path).st_size
if channels is None:
channels = DEBUG_CHANNELS
n_frames = size/(itemsize*width*height)
frames_per_plane = n_frames/n_planes/len(channels)
for plane in range(n_planes):
experiment_id = plane
outfile = os.path.join(exp_dir, "plane_{}.h5".format(plane))
frame_meta = []
for i, channel in enumerate(channels):
byte_offset = width * height * itemsize * \
(plane * len(channels) + i)
strides = [width*height*itemsize*n_planes*len(channels),
width*itemsize,
itemsize]
frame_meta.append({"byte_offset": byte_offset,
"channel": i+1,
"channel_description": channel,
"frame_description": "plane_{}".format(plane),
"dtype": ">u{}".format(itemsize),
"position_offset": [None, 0, 0],
"shape": [frames_per_plane, height, width],
"strides": strides})
metadata.append({"output_file": outfile,
"experiment_id": experiment_id,
"frame_metadata": frame_meta})
return metadata
[docs]def debug(experiment_id, local=False, raw_path=None):
OUTPUT_DIRECTORY = "/data/informatics/CAM/ophys_decomp"
SDK_PATH = "/data/informatics/CAM/ophys_decomp/allensdk"
SCRIPT = ("/data/informatics/CAM/ophys_decomp/allensdk/allensdk/"
"internal/pipeline_modules/run_ophys_session_decomposition.py")
exp_dir = os.path.join(OUTPUT_DIRECTORY, str(experiment_id))
if raw_path is not None:
conversion_definitions = create_fake_metadata(exp_dir, raw_path)
input_data = {"raw_filename": raw_path,
"frame_metadata": conversion_definitions}
else:
raise NotImplementedError("No real examples exist yet")
run_module(SCRIPT,
input_data,
exp_dir,
sdk_path=SDK_PATH,
pbs=dict(vmem=160,
job_name="ophys_decomp_%d"% experiment_id,
walltime="36:00:00"),
local=local)
[docs]def convert_frame(conversion_definition):
raw_filename = conversion_definition["input_file"]
ophys_hdf5_filename = conversion_definition["data_output_file"]
auxiliary_hdf5_filename = conversion_definition["auxiliary_output_file"]
experiment_id = conversion_definition["experiment_id"]
frame_metadata = conversion_definition["frame_metadata"]
osd.export_frame_to_hdf5(raw_filename, ophys_hdf5_filename,
auxiliary_hdf5_filename, frame_metadata)
return experiment_id, ophys_hdf5_filename, auxiliary_hdf5_filename
[docs]def parse_input(data):
'''Load all input data from the input json.'''
conversion_definitions = data["frame_metadata"]
for item in conversion_definitions:
item["input_file"] = data["raw_filename"]
return conversion_definitions
[docs]def main():
mod = PipelineModule("Decompose ophys session into individual planes.")
mod.parser.add_argument("-t", "--threads", type=int, default=4)
input_data = mod.input_data()
conversion_definitions = parse_input(input_data)
if mod.args.threads > 1:
pool = Pool(processes=mod.args.threads)
output = pool.map(convert_frame, conversion_definitions)
else:
output= []
for definition in conversion_definitions:
output.append(convert_frame(definition))
output_data = {}
for eid, ophys_file, auxiliary_file in output:
output_data[eid] = {"ophys_data": ophys_file,
"auxiliary_data": auxiliary_file}
mod.write_output_data(output_data)
if __name__ == "__main__": main()