Source code for allensdk.brain_observatory.ecephys.ecephys_project_api.ecephys_project_warehouse_api

import re
import json
import ast

import pandas as pd
import numpy as np

from .rma_engine import RmaEngine, AsyncRmaEngine
from .ecephys_project_api import EcephysProjectApi
from .utilities import rma_macros, build_and_execute


[docs]class EcephysProjectWarehouseApi(EcephysProjectApi): movie_re = re.compile(r".*natural_movie_(?P<num>\d+).npy") scene_re = re.compile(r".*/(?P<num>\d+).tiff") def __init__(self, rma_engine=None): if rma_engine is None: rma_engine = RmaEngine(scheme="http", host="api.brain-map.org") self.rma_engine = rma_engine
[docs] def get_session_data(self, session_id, **kwargs): well_known_files = build_and_execute( ( "criteria=model::WellKnownFile" ",rma::criteria,well_known_file_type[name$eq'EcephysNwb']" "[attachable_type$eq'EcephysSession']" r"[attachable_id$eq{{session_id}}]" ), engine=self.rma_engine.get_rma_tabular, session_id=session_id ) if well_known_files.shape[0] != 1: raise ValueError(f"expected exactly 1 nwb file for session {session_id}, found: {well_known_files}") download_link = well_known_files.iloc[0]["download_link"] return self.rma_engine.stream(download_link)
[docs] def get_natural_movie_template(self, number): well_known_files = self.stimulus_templates[self.stimulus_templates["movie_number"] == number] if well_known_files.shape[0] != 1: raise ValueError(f"expected exactly one natural movie template with number {number}, found {well_known_files}") download_link = well_known_files.iloc[0]["download_link"] return self.rma_engine.stream(download_link)
[docs] def get_natural_scene_template(self, number): well_known_files = self.stimulus_templates[self.stimulus_templates["scene_number"] == number] if well_known_files.shape[0] != 1: raise ValueError(f"expected exactly one natural scene template with number {number}, found {well_known_files}") download_link = well_known_files.iloc[0]["download_link"] return self.rma_engine.stream(download_link)
@property def stimulus_templates(self): if not hasattr(self, "_stimulus_templates_list"): self._stimulus_templates_list = self._list_stimulus_templates() return self._stimulus_templates_list def _list_stimulus_templates(self, ecephys_product_id=714914585): well_known_files = build_and_execute( ( "criteria=model::WellKnownFile" ",rma::criteria,well_known_file_type[name$eq'Stimulus']" "[attachable_type$eq'Product']" r"[attachable_id$eq{{ecephys_product_id}}]" ), engine=self.rma_engine.get_rma_tabular, ecephys_product_id=ecephys_product_id ) scene_number = [] movie_number = [] for _, row in well_known_files.iterrows(): scene_match = self.scene_re.match(row["path"]) movie_match = self.movie_re.match(row["path"]) if scene_match is not None: scene_number.append(int(scene_match["num"])) movie_number.append(None) elif movie_match is not None: movie_number.append(int(movie_match["num"])) scene_number.append(None) well_known_files["scene_number"] = scene_number well_known_files["movie_number"] = movie_number return well_known_files
[docs] def get_probe_lfp_data(self, probe_id): well_known_files = build_and_execute( ( "criteria=model::WellKnownFile" ",rma::criteria,well_known_file_type[name$eq'EcephysLfpNwb']" "[attachable_type$eq'EcephysProbe']" r"[attachable_id$eq{{probe_id}}]" ), engine=self.rma_engine.get_rma_tabular, probe_id=probe_id ) if well_known_files.shape[0] != 1: raise ValueError(f"expected exactly 1 LFP NWB file for probe {probe_id}, found: {well_known_files}") download_link = well_known_files.loc[0, "download_link"] return self.rma_engine.stream(download_link)
[docs] def get_sessions(self, session_ids=None, has_eye_tracking=None, stimulus_names=None): response = build_and_execute( ( "{% import 'rma_macros' as rm %}" "{% import 'macros' as m %}" "criteria=model::EcephysSession" r"{{rm.optional_contains('id',session_ids)}}" r"{%if has_eye_tracking is not none%}[fail_eye_tracking$eq{{m.str(not has_eye_tracking).lower()}}]{%endif%}" r"{{rm.optional_contains('stimulus_name',stimulus_names,True)}}" ",rma::include,specimen(donor(age))" ",well_known_files(well_known_file_type)" ), base=rma_macros(), engine=self.rma_engine.get_rma_tabular, session_ids=session_ids, has_eye_tracking=has_eye_tracking, stimulus_names=stimulus_names ) response.set_index("id", inplace=True) age_in_days = [] sex = [] genotype = [] has_nwb = [] for idx, row in response.iterrows(): age_in_days.append(row["specimen"]["donor"]["age"]["days"]) sex.append(row["specimen"]["donor"]["sex"]) gt = row["specimen"]["donor"]["full_genotype"] if gt is None: gt = "wt" genotype.append(gt) current_has_nwb = False for wkf in row["well_known_files"]: if wkf["well_known_file_type"]["name"] == "EcephysNwb": current_has_nwb = True has_nwb.append(current_has_nwb) response["age_in_days"] = age_in_days response["sex"] = sex response["genotype"] = genotype response["has_nwb"] = has_nwb response.drop(columns=["specimen", "fail_eye_tracking", "well_known_files"], inplace=True) response.rename(columns={"stimulus_name": "session_type"}, inplace=True) return response
[docs] def get_probes(self, probe_ids=None, session_ids=None): response = build_and_execute( ( "{% import 'rma_macros' as rm %}" "{% import 'macros' as m %}" "criteria=model::EcephysProbe" r"{{rm.optional_contains('id',probe_ids)}}" r"{{rm.optional_contains('ecephys_session_id',session_ids)}}" ), base=rma_macros(), engine=self.rma_engine.get_rma_tabular, session_ids=session_ids, probe_ids=probe_ids ) response.set_index("id", inplace=True) # Clarify name for external users response.rename(columns={"use_lfp_data": "has_lfp_data"}, inplace=True) return response
[docs] def get_channels(self, channel_ids=None, probe_ids=None): response = build_and_execute( ( "{% import 'rma_macros' as rm %}" "{% import 'macros' as m %}" "criteria=model::EcephysChannel" r"{{rm.optional_contains('id',channel_ids)}}" r"{{rm.optional_contains('ecephys_probe_id',probe_ids)}}" ",rma::include,structure" ",rma::options[tabular$eq'" "ecephys_channels.id" ",ecephys_probe_id" ",local_index" ",probe_horizontal_position" ",probe_vertical_position" ",anterior_posterior_ccf_coordinate" ",dorsal_ventral_ccf_coordinate" ",left_right_ccf_coordinate" ",structures.id as ecephys_structure_id" ",structures.acronym as ecephys_structure_acronym" "']" ), base=rma_macros(), engine=self.rma_engine.get_rma_tabular, probe_ids=probe_ids, channel_ids=channel_ids ) response.set_index("id", inplace=True) return response
[docs] def get_units(self, unit_ids=None, channel_ids=None, probe_ids=None, session_ids=None, *a, **k): response = build_and_execute( ( "{% import 'macros' as m %}" "criteria=model::EcephysUnit" r"{% if unit_ids is not none %},rma::criteria[id$in{{m.comma_sep(unit_ids)}}]{% endif %}" r"{% if channel_ids is not none %},rma::criteria[ecephys_channel_id$in{{m.comma_sep(channel_ids)}}]{% endif %}" r"{% if probe_ids is not none %},rma::criteria,ecephys_channel(ecephys_probe[id$in{{m.comma_sep(probe_ids)}}]){% endif %}" r"{% if session_ids is not none %},rma::criteria,ecephys_channel(ecephys_probe(ecephys_session[id$in{{m.comma_sep(session_ids)}}])){% endif %}" ), base=rma_macros(), engine=self.rma_engine.get_rma_tabular, session_ids=session_ids, probe_ids=probe_ids, channel_ids=channel_ids, unit_ids=unit_ids ) response.set_index("id", inplace=True) return response
[docs] def get_unit_analysis_metrics(self, unit_ids=None, ecephys_session_ids=None, session_types=None): """ Download analysis metrics - precalculated descriptions of unitwise responses to visual stimulation. Parameters ---------- unit_ids : array-like of int, optional Unique identifiers for ecephys units. If supplied, only download metrics for these units. ecephys_session_ids : array-like of int, optional Unique identifiers for ecephys sessions. If supplied, only download metrics for units collected during these sessions. session_types : array-like of str, optional Names of session types. e.g. "brain_observatory_1.1" or "functional_connectivity". If supplied, only download metrics for units collected during sessions of these types Returns ------- pd.DataFrame : A table of analysis metrics, indexed by unit_id. """ response = build_and_execute( ( "{% import 'macros' as m %}" "criteria=model::EcephysUnitMetricBundle" r"{% if unit_ids is not none %},rma::criteria[ecephys_unit_id$in{{m.comma_sep(unit_ids)}}]{% endif %}" r"{% if session_ids is not none %},rma::criteria,ecephys_unit(ecephys_channel(ecephys_probe(ecephys_session[id$in{{m.comma_sep(session_ids)}}]))){% endif %}" r"{% if session_types is not none %},rma::criteria,ecephys_unit(ecephys_channel(ecephys_probe(ecephys_session[stimulus_name$in{{m.comma_sep(session_types, True)}}]))){% endif %}" ), base=rma_macros(), engine=self.rma_engine.get_rma_list, session_ids=ecephys_session_ids, unit_ids=unit_ids, session_types=session_types ) output = [] for item in response: data = json.loads(item.pop("data")) item.update(data) output.append(item) output = pd.DataFrame(output) output.set_index("ecephys_unit_id", inplace=True) output.drop(columns="id", inplace=True) for colname in output.columns: try: output[colname] = output.apply(lambda row: ast.literal_eval(str(row[colname])), axis=1) except ValueError: pass # TODO: remove this # on_screen_rf and p_value_rf were correctly calculated, # but switched with one another. This snippet unswitches them. columns = set(output.columns.values.tolist()) if "p_value_rf" in columns and "on_screen_rf" in columns: pv_is_bool = np.issubdtype(output["p_value_rf"].values[0], np.bool) on_screen_is_float = np.issubdtype(output["on_screen_rf"].values[0].dtype, np.floating) # this is not a good test, but it avoids the case where we fix # these in the data for a future release, but # reintroduce the bug by forgetting to update the code. if pv_is_bool and on_screen_is_float: p_value_rf = output["p_value_rf"].copy() output["p_value_rf"] = output["on_screen_rf"].copy() output["on_screen_rf"] = p_value_rf return output
[docs] @classmethod def default(cls, asynchronous=False, **rma_kwargs): _rma_kwargs = {"scheme": "http", "host": "api.brain-map.org"} _rma_kwargs.update(rma_kwargs) engine_cls = AsyncRmaEngine if asynchronous else RmaEngine return cls(engine_cls(**_rma_kwargs))