import re
import json
import ast
import pandas as pd
import numpy as np
from .rma_engine import RmaEngine, AsyncRmaEngine
from .ecephys_project_api import EcephysProjectApi
from .utilities import rma_macros, build_and_execute
from allensdk.brain_observatory.ecephys.ecephys_project_api.warehouse_patches import replace_bad_structure_assignments
[docs]class EcephysProjectWarehouseApi(EcephysProjectApi):
movie_re = re.compile(r".*natural_movie_(?P<num>\d+).npy")
scene_re = re.compile(r".*/(?P<num>\d+).tiff")
def __init__(self, rma_engine=None):
if rma_engine is None:
rma_engine = RmaEngine(scheme="http", host="api.brain-map.org")
self.rma_engine = rma_engine
[docs] def get_session_data(self, session_id, **kwargs):
well_known_files = build_and_execute(
(
"criteria=model::WellKnownFile"
",rma::criteria,well_known_file_type[name$eq'EcephysNwb']"
"[attachable_type$eq'EcephysSession']"
r"[attachable_id$eq{{session_id}}]"
),
engine=self.rma_engine.get_rma_tabular, session_id=session_id
)
if well_known_files.shape[0] != 1:
raise ValueError(f"expected exactly 1 nwb file for session {session_id}, found: {well_known_files}")
download_link = well_known_files.iloc[0]["download_link"]
return self.rma_engine.stream(download_link)
[docs] def get_natural_movie_template(self, number):
well_known_files = self.stimulus_templates[self.stimulus_templates["movie_number"] == number]
if well_known_files.shape[0] != 1:
raise ValueError(f"expected exactly one natural movie template with number {number}, found {well_known_files}")
download_link = well_known_files.iloc[0]["download_link"]
return self.rma_engine.stream(download_link)
[docs] def get_natural_scene_template(self, number):
well_known_files = self.stimulus_templates[self.stimulus_templates["scene_number"] == number]
if well_known_files.shape[0] != 1:
raise ValueError(f"expected exactly one natural scene template with number {number}, found {well_known_files}")
download_link = well_known_files.iloc[0]["download_link"]
return self.rma_engine.stream(download_link)
@property
def stimulus_templates(self):
if not hasattr(self, "_stimulus_templates_list"):
self._stimulus_templates_list = self._list_stimulus_templates()
return self._stimulus_templates_list
def _list_stimulus_templates(self, ecephys_product_id=714914585):
well_known_files = build_and_execute(
(
"criteria=model::WellKnownFile"
",rma::criteria,well_known_file_type[name$eq'Stimulus']"
"[attachable_type$eq'Product']"
r"[attachable_id$eq{{ecephys_product_id}}]"
),
engine=self.rma_engine.get_rma_tabular,
ecephys_product_id=ecephys_product_id
)
scene_number = []
movie_number = []
for _, row in well_known_files.iterrows():
scene_match = self.scene_re.match(row["path"])
movie_match = self.movie_re.match(row["path"])
if scene_match is not None:
scene_number.append(int(scene_match["num"]))
movie_number.append(None)
elif movie_match is not None:
movie_number.append(int(movie_match["num"]))
scene_number.append(None)
well_known_files["scene_number"] = scene_number
well_known_files["movie_number"] = movie_number
return well_known_files
[docs] def get_probe_lfp_data(self, probe_id):
well_known_files = build_and_execute(
(
"criteria=model::WellKnownFile"
",rma::criteria,well_known_file_type[name$eq'EcephysLfpNwb']"
"[attachable_type$eq'EcephysProbe']"
r"[attachable_id$eq{{probe_id}}]"
),
engine=self.rma_engine.get_rma_tabular, probe_id=probe_id
)
if well_known_files.shape[0] != 1:
raise ValueError(f"expected exactly 1 LFP NWB file for probe {probe_id}, found: {well_known_files}")
download_link = well_known_files.loc[0, "download_link"]
return self.rma_engine.stream(download_link)
[docs] def get_sessions(self, session_ids=None, has_eye_tracking=None, stimulus_names=None):
response = build_and_execute(
(
"{% import 'rma_macros' as rm %}"
"{% import 'macros' as m %}"
"criteria=model::EcephysSession"
r"{{rm.optional_contains('id',session_ids)}}"
r"{%if has_eye_tracking is not none%}[fail_eye_tracking$eq{{m.str(not has_eye_tracking).lower()}}]{%endif%}"
r"{{rm.optional_contains('stimulus_name',stimulus_names,True)}}"
",rma::include,specimen(donor(age))"
",well_known_files(well_known_file_type)"
),
base=rma_macros(),
engine=self.rma_engine.get_rma_tabular,
session_ids=session_ids,
has_eye_tracking=has_eye_tracking,
stimulus_names=stimulus_names
)
response.set_index("id", inplace=True)
age_in_days = []
sex = []
genotype = []
has_nwb = []
for idx, row in response.iterrows():
age_in_days.append(row["specimen"]["donor"]["age"]["days"])
sex.append(row["specimen"]["donor"]["sex"])
gt = row["specimen"]["donor"]["full_genotype"]
if gt is None:
gt = "wt"
genotype.append(gt)
current_has_nwb = False
for wkf in row["well_known_files"]:
if wkf["well_known_file_type"]["name"] == "EcephysNwb":
current_has_nwb = True
has_nwb.append(current_has_nwb)
response["age_in_days"] = age_in_days
response["sex"] = sex
response["genotype"] = genotype
response["has_nwb"] = has_nwb
response.drop(columns=["specimen", "fail_eye_tracking", "well_known_files"], inplace=True)
response.rename(columns={"stimulus_name": "session_type"}, inplace=True)
return response
[docs] def get_probes(self, probe_ids=None, session_ids=None):
response = build_and_execute(
(
"{% import 'rma_macros' as rm %}"
"{% import 'macros' as m %}"
"criteria=model::EcephysProbe"
r"{{rm.optional_contains('id',probe_ids)}}"
r"{{rm.optional_contains('ecephys_session_id',session_ids)}}"
),
base=rma_macros(),
engine=self.rma_engine.get_rma_tabular,
session_ids=session_ids,
probe_ids=probe_ids
)
response.set_index("id", inplace=True)
# Clarify name for external users
response.rename(columns={"use_lfp_data": "has_lfp_data"}, inplace=True)
return response
[docs] def get_channels(self, channel_ids=None, probe_ids=None):
response = build_and_execute(
(
"{% import 'rma_macros' as rm %}"
"{% import 'macros' as m %}"
"criteria=model::EcephysChannel"
r"{{rm.optional_contains('id',channel_ids)}}"
r"{{rm.optional_contains('ecephys_probe_id',probe_ids)}}"
",rma::include,structure"
",rma::options[tabular$eq'"
"ecephys_channels.id"
",ecephys_probe_id"
",local_index"
",probe_horizontal_position"
",probe_vertical_position"
",anterior_posterior_ccf_coordinate"
",dorsal_ventral_ccf_coordinate"
",left_right_ccf_coordinate"
",structures.id as ecephys_structure_id"
",structures.acronym as ecephys_structure_acronym"
"']"
),
base=rma_macros(),
engine=self.rma_engine.get_rma_tabular,
probe_ids=probe_ids,
channel_ids=channel_ids
)
response.set_index("id", inplace=True)
replace_bad_structure_assignments(response, inplace=True)
return response
[docs] def get_units(self, unit_ids=None, channel_ids=None, probe_ids=None, session_ids=None, *a, **k):
response = build_and_execute(
(
"{% import 'macros' as m %}"
"criteria=model::EcephysUnit"
r"{% if unit_ids is not none %},rma::criteria[id$in{{m.comma_sep(unit_ids)}}]{% endif %}"
r"{% if channel_ids is not none %},rma::criteria[ecephys_channel_id$in{{m.comma_sep(channel_ids)}}]{% endif %}"
r"{% if probe_ids is not none %},rma::criteria,ecephys_channel(ecephys_probe[id$in{{m.comma_sep(probe_ids)}}]){% endif %}"
r"{% if session_ids is not none %},rma::criteria,ecephys_channel(ecephys_probe(ecephys_session[id$in{{m.comma_sep(session_ids)}}])){% endif %}"
),
base=rma_macros(), engine=self.rma_engine.get_rma_tabular,
session_ids=session_ids,
probe_ids=probe_ids,
channel_ids=channel_ids,
unit_ids=unit_ids
)
response.set_index("id", inplace=True)
return response
[docs] def get_unit_analysis_metrics(self, unit_ids=None, ecephys_session_ids=None, session_types=None):
""" Download analysis metrics - precalculated descriptions of unitwise responses to visual stimulation.
Parameters
----------
unit_ids : array-like of int, optional
Unique identifiers for ecephys units. If supplied, only download metrics for these units.
ecephys_session_ids : array-like of int, optional
Unique identifiers for ecephys sessions. If supplied, only download metrics for units collected during
these sessions.
session_types : array-like of str, optional
Names of session types. e.g. "brain_observatory_1.1" or "functional_connectivity". If supplied, only download
metrics for units collected during sessions of these types
Returns
-------
pd.DataFrame :
A table of analysis metrics, indexed by unit_id.
"""
response = build_and_execute(
(
"{% import 'macros' as m %}"
"criteria=model::EcephysUnitMetricBundle"
r"{% if unit_ids is not none %},rma::criteria[ecephys_unit_id$in{{m.comma_sep(unit_ids)}}]{% endif %}"
r"{% if session_ids is not none %},rma::criteria,ecephys_unit(ecephys_channel(ecephys_probe(ecephys_session[id$in{{m.comma_sep(session_ids)}}]))){% endif %}"
r"{% if session_types is not none %},rma::criteria,ecephys_unit(ecephys_channel(ecephys_probe(ecephys_session[stimulus_name$in{{m.comma_sep(session_types, True)}}]))){% endif %}"
),
base=rma_macros(),
engine=self.rma_engine.get_rma_list,
session_ids=ecephys_session_ids,
unit_ids=unit_ids,
session_types=session_types
)
output = []
for item in response:
data = json.loads(item.pop("data"))
item.update(data)
output.append(item)
output = pd.DataFrame(output)
output.set_index("ecephys_unit_id", inplace=True)
output.drop(columns="id", inplace=True)
for colname in output.columns:
try:
output[colname] = output.apply(lambda row: ast.literal_eval(str(row[colname])), axis=1)
except ValueError:
pass
# TODO: remove this
# on_screen_rf and p_value_rf were correctly calculated, but switched with one another. This snippet unswitches them.
columns = set(output.columns.values.tolist())
if "p_value_rf" in columns and "on_screen_rf" in columns:
pv_is_bool = np.issubdtype(output["p_value_rf"].values[0], bool)
on_screen_is_float = np.issubdtype(output["on_screen_rf"].values[0].dtype, np.floating)
# this is not a good test, but it avoids the case where we fix these in the data for a future release, but
# reintroduce the bug by forgetting to update the code.
if pv_is_bool and on_screen_is_float:
p_value_rf = output["p_value_rf"].copy()
output["p_value_rf"] = output["on_screen_rf"].copy()
output["on_screen_rf"] = p_value_rf
return output
[docs] @classmethod
def default(cls, asynchronous=True, **rma_kwargs):
_rma_kwargs = {"scheme": "http", "host": "api.brain-map.org"}
_rma_kwargs.update(rma_kwargs)
engine_cls = AsyncRmaEngine if asynchronous else RmaEngine
return cls(engine_cls(**_rma_kwargs))