import logging
from typing import Optional
from pathlib import Path
import numpy as np
import h5py
import pandas as pd
import uuid
import matplotlib.image as mpimg # NOQA: E402
from allensdk.api.cache import memoize
from allensdk.internal.api.ophys_lims_api import OphysLimsApi
from allensdk.brain_observatory.behavior.sync import (
get_sync_data, get_stimulus_rebase_function, frame_time_offset)
from allensdk.brain_observatory.sync_dataset import Dataset
from allensdk.brain_observatory import sync_utilities
from allensdk.internal.brain_observatory.time_sync import OphysTimeAligner
from allensdk.brain_observatory.behavior.stimulus_processing import get_stimulus_presentations, get_stimulus_templates, get_stimulus_metadata
from allensdk.brain_observatory.behavior.metadata_processing import get_task_parameters
from allensdk.brain_observatory.behavior.running_processing import get_running_df
from allensdk.brain_observatory.behavior.rewards_processing import get_rewards
from allensdk.brain_observatory.behavior.trials_processing import get_trials
from allensdk.brain_observatory.behavior.eye_tracking_processing import load_eye_tracking_hdf, process_eye_tracking_data
from allensdk.brain_observatory.running_speed import RunningSpeed
from allensdk.brain_observatory.behavior.image_api import ImageApi
from allensdk.internal.api import PostgresQueryMixin
from allensdk.brain_observatory.behavior.behavior_ophys_api import BehaviorOphysApiBase
from allensdk.brain_observatory.behavior.trials_processing import get_extended_trials
from allensdk.internal.core.lims_utilities import safe_system_path
from allensdk.core.auth_config import LIMS_DB_CREDENTIAL_MAP
from allensdk.core.authentication import credential_injector, DbCredentials
[docs]class BehaviorOphysLimsApi(OphysLimsApi, BehaviorOphysApiBase):
def __init__(self, ophys_experiment_id: int,
lims_credentials: Optional[DbCredentials] = None):
super().__init__(ophys_experiment_id, lims_credentials)
[docs] @memoize
def get_sync_data(self):
sync_path = self.get_sync_file()
return get_sync_data(sync_path)
[docs] @memoize
def get_stimulus_timestamps(self):
sync_path = self.get_sync_file()
timestamps, _, _ = (OphysTimeAligner(sync_file=sync_path)
.corrected_stim_timestamps)
return timestamps
[docs] @memoize
def get_ophys_timestamps(self):
ophys_timestamps = self.get_sync_data()['ophys_frames']
dff_traces = self.get_raw_dff_data()
number_of_cells, number_of_dff_frames = dff_traces.shape
num_of_timestamps = len(ophys_timestamps)
if number_of_dff_frames < num_of_timestamps:
ophys_timestamps = ophys_timestamps[:number_of_dff_frames]
elif number_of_dff_frames == num_of_timestamps:
pass
else:
raise RuntimeError('dff_frames is longer than timestamps')
return ophys_timestamps
[docs] @memoize
def get_experiment_container_id(self):
query = '''
SELECT visual_behavior_experiment_container_id
FROM ophys_experiments_visual_behavior_experiment_containers
WHERE ophys_experiment_id= {};
'''.format(self.get_ophys_experiment_id())
return self.lims_db.fetchone(query, strict=False)
[docs] @memoize
def get_behavior_stimulus_file(self):
query = '''
SELECT stim.storage_directory || stim.filename AS stim_file
FROM ophys_experiments oe
JOIN ophys_sessions os ON oe.ophys_session_id = os.id
JOIN behavior_sessions bs ON bs.ophys_session_id=os.id
LEFT JOIN well_known_files stim ON stim.attachable_id=bs.id AND stim.attachable_type = 'BehaviorSession' AND stim.well_known_file_type_id IN (SELECT id FROM well_known_file_types WHERE name = 'StimulusPickle')
WHERE oe.id= {};
'''.format(self.get_ophys_experiment_id())
return safe_system_path(self.lims_db.fetchone(query, strict=True))
[docs] def get_behavior_session_uuid(self):
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
return data['session_uuid']
[docs] @memoize
def get_stimulus_frame_rate(self):
stimulus_timestamps = self.get_stimulus_timestamps()
return np.round(1 / np.mean(np.diff(stimulus_timestamps)), 0)
[docs] @memoize
def get_ophys_frame_rate(self):
ophys_timestamps = self.get_ophys_timestamps()
return np.round(1 / np.mean(np.diff(ophys_timestamps)), 0)
[docs] @memoize
def get_dff_traces(self):
dff_traces = self.get_raw_dff_data()
cell_roi_id_list = self.get_cell_roi_ids()
df = pd.DataFrame({'dff': [x for x in dff_traces]}, index=pd.Index(cell_roi_id_list, name='cell_roi_id'))
cell_specimen_table = self.get_cell_specimen_table()
df = cell_specimen_table[['cell_roi_id']].join(df, on='cell_roi_id')
return df
[docs] @memoize
def get_running_data_df(self):
stimulus_timestamps = self.get_stimulus_timestamps()
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
return get_running_df(data, stimulus_timestamps)
[docs] @memoize
def get_running_speed(self):
running_data_df = self.get_running_data_df()
assert running_data_df.index.name == 'timestamps'
return RunningSpeed(timestamps=running_data_df.index.values,
values=running_data_df.speed.values)
[docs] @memoize
def get_stimulus_presentations(self):
stimulus_timestamps = self.get_stimulus_timestamps()
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
stimulus_presentations_df_pre = get_stimulus_presentations(data, stimulus_timestamps)
stimulus_metadata_df = get_stimulus_metadata(data)
idx_name = stimulus_presentations_df_pre.index.name
stimulus_index_df = stimulus_presentations_df_pre.reset_index().merge(stimulus_metadata_df.reset_index(), on=['image_name']).set_index(idx_name)
stimulus_index_df.sort_index(inplace=True)
stimulus_index_df = stimulus_index_df[['image_set', 'image_index', 'start_time']].rename(columns={'start_time': 'timestamps'})
stimulus_index_df.set_index('timestamps', inplace=True, drop=True)
stimulus_presentations_df = stimulus_presentations_df_pre.merge(stimulus_index_df, left_on='start_time', right_index=True, how='left')
assert len(stimulus_presentations_df_pre) == len(stimulus_presentations_df)
return stimulus_presentations_df[sorted(stimulus_presentations_df.columns)]
[docs] @memoize
def get_stimulus_templates(self):
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
return get_stimulus_templates(data)
[docs] @memoize
def get_sync_licks(self):
lick_times = self.get_sync_data()['lick_times']
return pd.DataFrame({'time': lick_times})
[docs] @memoize
def get_licks(self):
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
rebase_function = self.get_stimulus_rebase_function()
# Get licks from pickle file (need to add an offset to align with
# the trial_log time stream)
lick_frames = (data["items"]["behavior"]["lick_sensors"][0]
["lick_events"])
vsyncs = data["items"]["behavior"]["intervalsms"]
vsync_times_raw = np.hstack((0, vsyncs)).cumsum() / 1000.0 # cumulative time
vsync_offset = frame_time_offset(data)
vsync_times = vsync_times_raw + vsync_offset
lick_times = [vsync_times[frame] for frame in lick_frames]
# Align pickle data with sync time stream
return pd.DataFrame({"time": list(map(rebase_function, lick_times))})
[docs] @memoize
def get_rewards(self):
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
rebase_function = self.get_stimulus_rebase_function()
return get_rewards(data, rebase_function)
[docs] @memoize
def get_task_parameters(self):
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
return get_task_parameters(data)
[docs] @memoize
def get_trials(self):
licks = self.get_licks()
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
rewards = self.get_rewards()
stimulus_presentations = self.get_stimulus_presentations()
rebase_function = self.get_stimulus_rebase_function()
trial_df = get_trials(data, licks, rewards, stimulus_presentations, rebase_function)
return trial_df
[docs] @memoize
def get_corrected_fluorescence_traces(self):
demix_file = self.get_demix_file()
g = h5py.File(demix_file)
corrected_fluorescence_trace_array = np.asarray(g['data'])
g.close()
cell_roi_id_list = self.get_cell_roi_ids()
ophys_timestamps = self.get_ophys_timestamps()
assert corrected_fluorescence_trace_array.shape[1], ophys_timestamps.shape[0]
df = pd.DataFrame({'corrected_fluorescence': list(corrected_fluorescence_trace_array)}, index=pd.Index(cell_roi_id_list, name='cell_roi_id'))
cell_specimen_table = self.get_cell_specimen_table()
df = cell_specimen_table[['cell_roi_id']].join(df, on='cell_roi_id')
return df
[docs] @memoize
def get_average_projection(self, image_api=None):
if image_api is None:
image_api = ImageApi
avgint_a1X_file = self.get_average_intensity_projection_image_file()
pixel_size = self.get_surface_2p_pixel_size_um()
average_image = mpimg.imread(avgint_a1X_file)
return ImageApi.serialize(average_image, [pixel_size / 1000., pixel_size / 1000.], 'mm')
[docs] @memoize
def get_motion_correction(self):
motion_correction_filepath = self.get_rigid_motion_transform_file()
motion_correction = pd.read_csv(motion_correction_filepath)
return motion_correction[['x', 'y']]
[docs] @memoize
def get_nwb_filepath(self):
query = '''
SELECT wkf.storage_directory || wkf.filename AS nwb_file
FROM ophys_experiments oe
LEFT JOIN well_known_files wkf ON wkf.attachable_id=oe.id AND wkf.well_known_file_type_id IN (SELECT id FROM well_known_file_types WHERE name = 'BehaviorOphysNwb')
WHERE oe.id = {};
'''.format(self.get_ophys_experiment_id())
return safe_system_path(self.lims_db.fetchone(query, strict=True))
[docs] def get_stimulus_rebase_function(self):
stimulus_timestamps_no_monitor_delay = self.get_sync_data()['stimulus_times_no_delay']
behavior_stimulus_file = self.get_behavior_stimulus_file()
data = pd.read_pickle(behavior_stimulus_file)
stimulus_rebase_function = get_stimulus_rebase_function(data, stimulus_timestamps_no_monitor_delay)
return stimulus_rebase_function
[docs] def get_extended_trials(self):
filename = self.get_behavior_stimulus_file()
data = pd.read_pickle(filename)
return get_extended_trials(data)
[docs] @memoize
def get_eye_tracking_filepath(self):
query = '''SELECT wkf.storage_directory || wkf.filename AS eye_tracking_file
FROM ophys_experiments oe
LEFT JOIN well_known_files wkf ON wkf.attachable_id=oe.ophys_session_id
AND wkf.attachable_type = 'OphysSession'
AND wkf.well_known_file_type_id=(SELECT id FROM well_known_file_types WHERE name = 'EyeTracking Ellipses')
WHERE oe.id={};
'''.format(self.get_ophys_experiment_id())
return safe_system_path(self.lims_db.fetchone(query, strict=True))
[docs] def get_eye_tracking(self,
z_threshold: float = 3.0,
dilation_frames: int = 2):
logger = logging.getLogger("BehaviorOphysLimsApi")
logger.info(f"Getting eye_tracking_data with "
f"'z_threshold={z_threshold}', "
f"'dilation_frames={dilation_frames}'")
filepath = Path(self.get_eye_tracking_filepath())
sync_path = Path(self.get_sync_file())
eye_tracking_data = load_eye_tracking_hdf(filepath)
frame_times = sync_utilities.get_synchronized_frame_times(
session_sync_file=sync_path,
sync_line_label_keys=Dataset.EYE_TRACKING_KEYS)
eye_tracking_data = process_eye_tracking_data(eye_tracking_data,
frame_times,
z_threshold,
dilation_frames)
return eye_tracking_data
[docs] @staticmethod
def get_ophys_experiment_df():
api = (credential_injector(LIMS_DB_CREDENTIAL_MAP)
(PostgresQueryMixin)())
query = '''
SELECT
oec.visual_behavior_experiment_container_id as container_id,
oec.ophys_experiment_id,
oe.workflow_state,
d.full_genotype as full_genotype,
id.depth as imaging_depth,
st.acronym as targeted_structure,
os.name as session_name,
equipment.name as equipment_name
FROM ophys_experiments_visual_behavior_experiment_containers oec
LEFT JOIN ophys_experiments oe ON oe.id = oec.ophys_experiment_id
LEFT JOIN ophys_sessions os ON oe.ophys_session_id = os.id
LEFT JOIN specimens sp ON sp.id=os.specimen_id
LEFT JOIN donors d ON d.id=sp.donor_id
LEFT JOIN imaging_depths id ON id.id=oe.imaging_depth_id
LEFT JOIN structures st ON st.id=oe.targeted_structure_id
LEFT JOIN equipment ON equipment.id=os.equipment_id
'''
return pd.read_sql(query, api.get_connection())
[docs] @staticmethod
def get_containers_df(only_passed=True):
api = (credential_injector(LIMS_DB_CREDENTIAL_MAP)
(PostgresQueryMixin)())
if only_passed is True:
query = '''
SELECT *
FROM visual_behavior_experiment_containers vbc
WHERE workflow_state IN ('container_qc','publish');
'''
else:
query = '''
SELECT *
FROM visual_behavior_experiment_containers vbc
'''
return pd.read_sql(query, api.get_connection()).rename(columns={'id': 'container_id'})[['container_id', 'specimen_id', 'workflow_state']]
[docs] @classmethod
def get_api_list_by_container_id(cls, container_id):
df = cls.get_ophys_experiment_df()
oeid_list = df[df['container_id'] == container_id]['ophys_experiment_id'].values
return [cls(oeid) for oeid in oeid_list]
if __name__ == "__main__":
print(BehaviorOphysLimsApi.get_ophys_experiment_df())
# print(BehaviorOphysLimsApi.get_containers_df(only_passed=False))
# print(BehaviorOphysLimsApi.get_api_by_container(838105949))
# ophys_experiment_id = df['ophys_experiment_id'].iloc[0]
# print(ophys_experiment_id)
# BehaviorOphysLimsApi
# print(L)
# for c in sorted(L.columns):
# print(c)
# for x in [791352433, 814796698, 814796612, 814796558, 814797528]:
# print(x in L)