Source code for allensdk.internal.pipeline_modules.run_ophys_session_decomposition

import logging
from allensdk.internal.core.lims_pipeline_module import (PipelineModule,
                                                         run_module)
import allensdk.core.json_utilities as ju
import allensdk.internal.core.lims_utilities as lu
from allensdk.internal.brain_observatory import ophys_session_decomposition as osd
from multiprocessing import Pool
import os

DEBUG_CHANNELS = ["data", "piezo"]
DEBUG_WIDTH = 512
DEBUG_HEIGHT = 256
DEBUG_ITEMSIZE = 2
DEBUG_N_PLANES = 6

[docs]def create_fake_metadata(exp_dir, raw_path, channels=None, width=DEBUG_WIDTH, height=DEBUG_HEIGHT, itemsize=DEBUG_ITEMSIZE, n_planes=DEBUG_N_PLANES): metadata = [] size = os.stat(raw_path).st_size if channels is None: channels = DEBUG_CHANNELS n_frames = size/(itemsize*width*height) frames_per_plane = n_frames/n_planes/len(channels) for plane in range(n_planes): experiment_id = plane outfile = os.path.join(exp_dir, "plane_{}.h5".format(plane)) frame_meta = [] for i, channel in enumerate(channels): byte_offset = width * height * itemsize * \ (plane * len(channels) + i) strides = [width*height*itemsize*n_planes*len(channels), width*itemsize, itemsize] frame_meta.append({"byte_offset": byte_offset, "channel": i+1, "channel_description": channel, "frame_description": "plane_{}".format(plane), "dtype": ">u{}".format(itemsize), "position_offset": [None, 0, 0], "shape": [frames_per_plane, height, width], "strides": strides}) metadata.append({"output_file": outfile, "experiment_id": experiment_id, "frame_metadata": frame_meta}) return metadata
[docs]def debug(experiment_id, local=False, raw_path=None): OUTPUT_DIRECTORY = "/data/informatics/CAM/ophys_decomp" SDK_PATH = "/data/informatics/CAM/ophys_decomp/allensdk" SCRIPT = ("/data/informatics/CAM/ophys_decomp/allensdk/allensdk/" "internal/pipeline_modules/run_ophys_session_decomposition.py") exp_dir = os.path.join(OUTPUT_DIRECTORY, str(experiment_id)) if raw_path is not None: conversion_definitions = create_fake_metadata(exp_dir, raw_path) input_data = {"raw_filename": raw_path, "frame_metadata": conversion_definitions} else: raise NotImplementedError("No real examples exist yet") run_module(SCRIPT, input_data, exp_dir, sdk_path=SDK_PATH, pbs=dict(vmem=160, job_name="ophys_decomp_%d"% experiment_id, walltime="36:00:00"), local=local)
[docs]def convert_frame(conversion_definition): raw_filename = conversion_definition["input_file"] ophys_hdf5_filename = conversion_definition["data_output_file"] auxiliary_hdf5_filename = conversion_definition["auxiliary_output_file"] experiment_id = conversion_definition["experiment_id"] frame_metadata = conversion_definition["frame_metadata"] osd.export_frame_to_hdf5(raw_filename, ophys_hdf5_filename, auxiliary_hdf5_filename, frame_metadata) return experiment_id, ophys_hdf5_filename, auxiliary_hdf5_filename
[docs]def parse_input(data): '''Load all input data from the input json.''' conversion_definitions = data["frame_metadata"] for item in conversion_definitions: item["input_file"] = data["raw_filename"] return conversion_definitions
[docs]def main(): mod = PipelineModule("Decompose ophys session into individual planes.") mod.parser.add_argument("-t", "--threads", type=int, default=4) input_data = mod.input_data() conversion_definitions = parse_input(input_data) if mod.args.threads > 1: pool = Pool(processes=mod.args.threads) output = pool.map(convert_frame, conversion_definitions) else: output= [] for definition in conversion_definitions: output.append(convert_frame(definition)) output_data = {} for eid, ophys_file, auxiliary_file in output: output_data[eid] = {"ophys_data": ophys_file, "auxiliary_data": auxiliary_file} mod.write_output_data(output_data)
if __name__ == "__main__": main()