Source code for allensdk.internal.pipeline_modules.run_ophys_eye_calibration

import os
from allensdk.internal.core.lims_pipeline_module import (PipelineModule,
                                                         run_module)
from allensdk.internal.brain_observatory import (eye_calibration,
                                                 itracker_utils)
import allensdk.internal.core.lims_utilities as lu
import numpy as np
import h5py

EYE_RADIUS = 0.1682
CM_PER_PIXEL = 10.2/10000

[docs]def get_wkf(wkf_type, experiment_id): wkf = lu.query(""" select CONCAT(wkf.storage_directory, wkf.filename) as path from well_known_files wkf join well_known_file_types wkft on wkft.id = wkf.well_known_file_type_id where wkft.name LIKE '{}' and wkf.attachable_id = {} """.format(wkf_type, experiment_id))[0]["path"] return wkf
[docs]def debug(experiment_id, local=False): OUTPUT_DIRECTORY = "/data/informatics/CAM/eye_calibration" SDK_PATH = "/data/informatics/CAM/eye_calibration/allensdk" SCRIPT = ("/data/informatics/CAM/eye_calibration/allensdk/allensdk" "/internal/pipeline_modules/run_ophys_eye_calibration.py") frame_width = 640 frame_height = 480 cr_file = get_wkf("EyeTracking Corneal Reflection", experiment_id) pupil_file = get_wkf("EyeTracking Pupil", experiment_id) exp_info = lu.query(""" select * from ophys_sessions os where os.id = {} """.format(experiment_id))[0] exp_dir = os.path.join(OUTPUT_DIRECTORY, str(experiment_id)) # clear out missing values to let us get defaults for key, value in list(exp_info.items()): if value is None: del exp_info[key] input_data = { "cr_params_file": cr_file, "pupil_params_file": pupil_file, "frame_width": frame_width, "frame_height": frame_height, "output_file": os.path.join(exp_dir, "eye_tracking_to_screen_mapping.h5"), "monitor_position_x_mm": exp_info.get( "screen_center_x_mm", eye_calibration.MONITOR_POSITION_OLD[0]*10), "monitor_position_y_mm": exp_info.get( "screen_center_y_mm", eye_calibration.MONITOR_POSITION_OLD[1]*10), "monitor_position_z_mm": exp_info.get( "screen_center_z_mm", eye_calibration.MONITOR_POSITION_OLD[2]*10), "monitor_rotation_x_deg": exp_info.get("screen_rotation_x_deg", 0), "monitor_rotation_y_deg": exp_info.get("screen_rotation_y_deg", 0), "monitor_rotation_z_deg": exp_info.get("screen_rotation_z_deg", 0), "camera_position_x_mm": exp_info.get( "camera_center_x_mm", eye_calibration.CAMERA_POSITION_OLD[0]*10), "camera_position_y_mm": exp_info.get( "camera_center_y_mm", eye_calibration.CAMERA_POSITION_OLD[1]*10), "camera_position_z_mm": exp_info.get( "camera_center_z_mm", eye_calibration.CAMERA_POSITION_OLD[2]*10), "camera_rotation_x_deg": exp_info.get( "camera_rotation_x_deg", eye_calibration.CAMERA_ROTATIONS_OLD[0]*180/np.pi), "camera_rotation_y_deg": exp_info.get( "camera_rotation_y_deg", eye_calibration.CAMERA_ROTATIONS_OLD[1]*180/np.pi), "camera_rotation_z_deg": exp_info.get( "camera_rotation_z_deg", eye_calibration.CAMERA_ROTATIONS_OLD[2]*180/np.pi), "led_position_x_mm": exp_info.get( "led_center_x_mm", eye_calibration.LED_POSITION_ORIGINAL[0]*10), "led_position_y_mm": exp_info.get( "led_center_y_mm", eye_calibration.LED_POSITION_ORIGINAL[1]*10), "led_position_z_mm": exp_info.get( "led_center_z_mm", eye_calibration.LED_POSITION_ORIGINAL[2]*10) } # TEMPORARY HACKS TO DEAL WITH BAD DATA IN LIMS # TODO: REMOVE WHEN DATAFIXES DONE if input_data["monitor_position_x_mm"] == -86.2: input_data["monitor_position_x_mm"] = \ eye_calibration.MONITOR_POSITION_NEW[0]*10 input_data["monitor_position_y_mm"] = \ eye_calibration.MONITOR_POSITION_NEW[1]*10 input_data["monitor_position_z_mm"] = \ eye_calibration.MONITOR_POSITION_NEW[2]*10 run_module(SCRIPT, input_data, exp_dir, sdk_path=SDK_PATH, local=local)
[docs]def parse_input_data(data): cr_params = np.load(data["cr_params_file"]) pupil_params = np.load(data["pupil_params_file"]) frame_width = data["frame_width"] frame_height = data["frame_height"] output_file = data["output_file"] monitor_position = np.array([ float(data['monitor_position_x_mm'])/10.0, float(data['monitor_position_y_mm'])/10.0, float(data['monitor_position_z_mm'])/10.0, ]) monitor_rotations = np.array([ float(data['monitor_rotation_x_deg'])*np.pi/180, float(data['monitor_rotation_y_deg'])*np.pi/180, float(data['monitor_rotation_z_deg'])*np.pi/180, ]) camera_position = np.array([ float(data['camera_position_x_mm'])/10.0, float(data['camera_position_y_mm'])/10.0, float(data['camera_position_z_mm'])/10.0, ]) camera_rotations = np.array([ float(data['camera_rotation_x_deg'])*np.pi/180, float(data['camera_rotation_y_deg'])*np.pi/180, float(data['camera_rotation_z_deg'])*np.pi/180, ]) led_position = np.array([ float(data['led_position_x_mm'])/10.0, float(data['led_position_y_mm'])/10.0, float(data['led_position_z_mm'])/10.0, ]) calibrator = eye_calibration.EyeCalibration( monitor_position=monitor_position, monitor_rotations=monitor_rotations, led_position=led_position, camera_position=camera_position, camera_rotations=camera_rotations, eye_radius=EYE_RADIUS, cm_per_pixel=CM_PER_PIXEL) cr_params, _ = itracker_utils.post_process_cr(cr_params) pupil_params = itracker_utils.post_process_pupil(pupil_params) cr_params = itracker_utils.filter_bad_params(cr_params, frame_width, frame_height) pupil_params = itracker_utils.filter_bad_params(pupil_params, frame_width, frame_height) return calibrator, cr_params, pupil_params, output_file
[docs]def write_output(filename, position_degrees, position_cm, areas): with h5py.File(filename, "w") as f: f.create_dataset("screen_coordinates", data=position_cm) f.create_dataset("screen_coordinates_spherical", data=position_degrees) f.create_dataset("pupil_areas", data=areas)
[docs]def main(): mod = PipelineModule() data = mod.input_data() calibrator, cr_params, pupil_params, outfile = parse_input_data(data) pupil_areas = calibrator.compute_area(pupil_params) pupil_on_monitor_deg = calibrator.pupil_position_on_monitor_in_degrees( pupil_params, cr_params) pupil_on_monitor_cm = calibrator.pupil_position_on_monitor_in_cm( pupil_params, cr_params) missing_index = np.isnan(pupil_areas) | np.isnan(pupil_on_monitor_deg.T[0]) pupil_areas[missing_index] = np.nan pupil_on_monitor_deg[missing_index,:] = np.nan pupil_on_monitor_cm[missing_index,:] = np.nan write_output(outfile, pupil_on_monitor_deg, pupil_on_monitor_cm, pupil_areas) mod.write_output_data({"screen_mapping_file": outfile})
if __name__ == "__main__": main()