import pandas as pd
from typing import Optional, List, Dict, Any, Iterable
import logging
from allensdk.brain_observatory.behavior.internal.behavior_project_base\
import BehaviorProjectBase
from allensdk.brain_observatory.behavior.behavior_data_session import (
BehaviorDataSession)
from allensdk.brain_observatory.behavior.behavior_ophys_session import (
BehaviorOphysSession)
from allensdk.internal.api.behavior_data_lims_api import BehaviorDataLimsApi
from allensdk.internal.api.behavior_ophys_api import BehaviorOphysLimsApi
from allensdk.internal.api import PostgresQueryMixin
from allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine import (
HttpEngine)
from allensdk.core.typing import SupportsStr
from allensdk.core.authentication import DbCredentials, credential_injector
from allensdk.core.auth_config import (
MTRAIN_DB_CREDENTIAL_MAP, LIMS_DB_CREDENTIAL_MAP)
[docs]class BehaviorProjectLimsApi(BehaviorProjectBase):
def __init__(self, lims_engine, mtrain_engine, app_engine):
""" Downloads visual behavior data from the Allen Institute's
internal Laboratory Information Management System (LIMS). Only
functional if connected to the Allen Institute Network. Used to load
data into BehaviorProjectCache.
Typically want to construct an instance of this class by calling
`BehaviorProjectLimsApi.default()`.
Set log level to debug to see SQL queries dumped by
"BehaviorProjectLimsApi" logger.
Note -- Currently the app engine is unused because we aren't yet
supporting the download of stimulus templates for visual behavior
data. This feature will be added at a later date.
Parameters
----------
lims_engine :
used for making queries against the LIMS postgres database. Must
implement:
select : takes a postgres query as a string. Returns a pandas
dataframe of results
fetchall : takes a postgres query as a string. If there is
exactly one column in the response, return the values as a
list.
mtrain_engine :
used for making queries against the mtrain postgres database. Must
implement:
select : takes a postgres query as a string. Returns a pandas
dataframe of results
fetchall : takes a postgres query as a string. If there is
exactly one column in the response, return the values as a
list.
app_engine :
used for making queries agains the lims web application. Must
implement:
stream : takes a url as a string. Returns an iterable yielding
the response body as bytes.
"""
self.lims_engine = lims_engine
self.mtrain_engine = mtrain_engine
self.app_engine = app_engine
self.logger = logging.getLogger("BehaviorProjectLimsApi")
[docs] @classmethod
def default(
cls,
lims_credentials: Optional[DbCredentials] = None,
mtrain_credentials: Optional[DbCredentials] = None,
app_kwargs: Optional[Dict[str, Any]] = None) -> \
"BehaviorProjectLimsApi":
"""Construct a BehaviorProjectLimsApi instance with default
postgres and app engines.
Parameters
----------
lims_credentials: Optional[DbCredentials]
Credentials to pass to the postgres connector to the lims database.
If left unspecified, will check environment variables for the
appropriate values.
mtrain_credentials: Optional[DbCredentials]
Credentials to pass to the postgres connector to the mtrain
database. If left unspecified, will check environment variables
for the appropriate values.
app_kwargs: Dict
Dict of arguments to pass to the app engine. Currently unused.
Returns
-------
BehaviorProjectLimsApi
"""
_app_kwargs = {"scheme": "http", "host": "lims2"}
if app_kwargs:
_app_kwargs.update(app_kwargs)
if lims_credentials:
lims_engine = PostgresQueryMixin(
dbname=lims_credentials.dbname, user=lims_credentials.user,
host=lims_credentials.host, password=lims_credentials.password,
port=lims_credentials.port)
else:
# Currying is equivalent to decorator syntactic sugar
lims_engine = (credential_injector(LIMS_DB_CREDENTIAL_MAP)
(PostgresQueryMixin)())
if mtrain_credentials:
mtrain_engine = PostgresQueryMixin(
dbname=lims_credentials.dbname, user=lims_credentials.user,
host=lims_credentials.host, password=lims_credentials.password,
port=lims_credentials.port)
else:
# Currying is equivalent to decorator syntactic sugar
mtrain_engine = (
credential_injector(MTRAIN_DB_CREDENTIAL_MAP)
(PostgresQueryMixin)())
app_engine = HttpEngine(**_app_kwargs)
return cls(lims_engine, mtrain_engine, app_engine)
@staticmethod
def _build_in_list_selector_query(
col,
valid_list: Optional[SupportsStr] = None,
operator: str = "WHERE") -> str:
"""
Filter for rows where the value of a column is contained in a list.
If no list is specified in `valid_list`, return an empty string.
NOTE: if string ids are used, then the strings in `valid_list` must
be enclosed in single quotes, or else the query will throw a column
does not exist error. E.g. ["'mystringid1'", "'mystringid2'"...]
:param col: name of column to compare if in a list
:type col: str
:param valid_list: iterable of values that can be mapped to str
(e.g. string, int, float).
:type valid_list: list
:param operator: SQL operator to start the clause. Default="WHERE".
Valid inputs: "AND", "OR", "WHERE" (not case-sensitive).
:type operator: str
"""
if not valid_list:
return ""
session_query = (
f"""{operator} {col} IN ({",".join(
sorted(set(map(str, valid_list))))})""")
return session_query
@staticmethod
def _build_experiment_from_session_query() -> str:
"""Aggregate sql sub-query to get all ophys_experiment_ids associated
with a single ophys_session_id."""
query = f"""
-- -- begin getting all ophys_experiment_ids -- --
SELECT
(ARRAY_AGG(DISTINCT(oe.id))) AS experiment_ids, os.id
FROM ophys_sessions os
RIGHT JOIN ophys_experiments oe ON oe.ophys_session_id = os.id
GROUP BY os.id
-- -- end getting all ophys_experiment_ids -- --
"""
return query
@staticmethod
def _build_line_from_donor_query(line="driver") -> str:
"""Sub-query to get a line from a donor.
:param line: 'driver' or 'reporter'
"""
query = f"""
-- -- begin getting {line} line from donors -- --
SELECT ARRAY_AGG (g.name) AS {line}_line, d.id AS donor_id
FROM donors d
LEFT JOIN donors_genotypes dg ON dg.donor_id=d.id
LEFT JOIN genotypes g ON g.id=dg.genotype_id
LEFT JOIN genotype_types gt ON gt.id=g.genotype_type_id
WHERE gt.name='{line}'
GROUP BY d.id
-- -- end getting {line} line from donors -- --
"""
return query
def _get_behavior_summary_table(self,
session_sub_query: str) -> pd.DataFrame:
"""Build and execute query to retrieve summary data for all data,
or a subset of session_ids (via the session_sub_query).
Should pass an empty string to `session_sub_query` if want to get
all data in the database.
:param session_sub_query: additional filtering logic to get a
subset of sessions.
:type session_sub_query: str
:rtype: pd.DataFrame
"""
query = f"""
SELECT
bs.id AS behavior_session_id,
bs.ophys_session_id,
bs.behavior_training_id,
equipment.name as equipment_name,
bs.date_of_acquisition,
d.id as donor_id,
d.full_genotype,
reporter.reporter_line,
driver.driver_line,
g.name AS sex,
DATE_PART('day', bs.date_of_acquisition - d.date_of_birth)
AS age_in_days,
bs.foraging_id
FROM behavior_sessions bs
JOIN donors d on bs.donor_id = d.id
JOIN genders g on g.id = d.gender_id
JOIN (
{self._build_line_from_donor_query("reporter")}
) reporter on reporter.donor_id = d.id
JOIN (
{self._build_line_from_donor_query("driver")}
) driver on driver.donor_id = d.id
JOIN equipment ON equipment.id = bs.equipment_id
{session_sub_query}
"""
self.logger.debug(f"get_behavior_session_table query: \n{query}")
return self.lims_engine.select(query)
def _get_foraging_ids_from_behavior_session(
self, behavior_session_ids: List[int]) -> List[str]:
behav_ids = self._build_in_list_selector_query("id",
behavior_session_ids,
operator="AND")
forag_ids_query = f"""
SELECT foraging_id
FROM behavior_sessions
WHERE foraging_id IS NOT NULL
{behav_ids};
"""
self.logger.debug("get_foraging_ids_from_behavior_session query: \n"
f"{forag_ids_query}")
foraging_ids = self.lims_engine.fetchall(forag_ids_query)
self.logger.debug(f"Retrieved {len(foraging_ids)} foraging ids for"
f" behavior stage query. Ids = {foraging_ids}")
return foraging_ids
def _get_behavior_stage_table(
self,
behavior_session_ids: Optional[List[int]] = None):
# Select fewer rows if possible via behavior_session_id
if behavior_session_ids:
foraging_ids = self._get_foraging_ids_from_behavior_session(
behavior_session_ids)
foraging_ids = [f"'{fid}'" for fid in foraging_ids]
# Otherwise just get the full table from mtrain
else:
foraging_ids = None
foraging_ids_query = self._build_in_list_selector_query(
"bs.id", foraging_ids)
query = f"""
SELECT
stages.name as session_type,
bs.id AS foraging_id
FROM behavior_sessions bs
JOIN stages ON stages.id = bs.state_id
{foraging_ids_query};
"""
self.logger.debug(f"_get_behavior_stage_table query: \n {query}")
return self.mtrain_engine.select(query)
[docs] def get_session_data(self, ophys_session_id: int) -> BehaviorOphysSession:
"""Returns a BehaviorOphysSession object that contains methods
to analyze a single behavior+ophys session.
:param ophys_session_id: id that corresponds to a behavior session
:type ophys_session_id: int
:rtype: BehaviorOphysSession
"""
return BehaviorOphysSession(BehaviorOphysLimsApi(ophys_session_id))
def _get_experiment_table(
self,
ophys_experiment_ids: Optional[List[int]] = None) -> pd.DataFrame:
"""
Helper function for easier testing.
Return a pd.Dataframe table with all ophys_experiment_ids and relevant
metadata.
Return columns: ophys_session_id, behavior_session_id,
ophys_experiment_id, project_code, session_name,
session_type, equipment_name, date_of_acquisition,
specimen_id, full_genotype, sex, age_in_days,
reporter_line, driver_line
:param ophys_experiment_ids: optional list of ophys_experiment_ids
to include
:rtype: pd.DataFrame
"""
if not ophys_experiment_ids:
self.logger.warning("Getting all ophys sessions."
" This might take a while.")
experiment_query = self._build_in_list_selector_query(
"oe.id", ophys_experiment_ids)
query = f"""
SELECT
oe.id as ophys_experiment_id,
os.id as ophys_session_id,
bs.id as behavior_session_id,
oec.visual_behavior_experiment_container_id as container_id,
pr.code as project_code,
vbc.workflow_state as container_workflow_state,
oe.workflow_state as experiment_workflow_state,
os.name as session_name,
os.stimulus_name as session_type,
equipment.name as equipment_name,
os.date_of_acquisition,
os.isi_experiment_id,
os.specimen_id,
g.name as sex,
DATE_PART('day', os.date_of_acquisition - d.date_of_birth)
AS age_in_days,
d.full_genotype,
reporter.reporter_line,
driver.driver_line,
id.depth as imaging_depth,
st.acronym as targeted_structure,
vbc.published_at
FROM ophys_experiments_visual_behavior_experiment_containers oec
JOIN visual_behavior_experiment_containers vbc
ON oec.visual_behavior_experiment_container_id = vbc.id
JOIN ophys_experiments oe ON oe.id = oec.ophys_experiment_id
JOIN ophys_sessions os ON os.id = oe.ophys_session_id
JOIN behavior_sessions bs ON os.id = bs.ophys_session_id
JOIN projects pr ON pr.id = os.project_id
JOIN donors d ON d.id = bs.donor_id
JOIN genders g ON g.id = d.gender_id
JOIN (
{self._build_line_from_donor_query(line="reporter")}
) reporter on reporter.donor_id = d.id
JOIN (
{self._build_line_from_donor_query(line="driver")}
) driver on driver.donor_id = d.id
LEFT JOIN imaging_depths id ON id.id = oe.imaging_depth_id
JOIN structures st ON st.id = oe.targeted_structure_id
JOIN equipment ON equipment.id = os.equipment_id
{experiment_query};
"""
self.logger.debug(f"get_experiment_table query: \n{query}")
return self.lims_engine.select(query)
def _get_session_table(
self,
ophys_session_ids: Optional[List[int]] = None) -> pd.DataFrame:
"""Helper function for easier testing.
Return a pd.Dataframe table with all ophys_session_ids and relevant
metadata.
Return columns: ophys_session_id, behavior_session_id,
ophys_experiment_id, project_code, session_name,
session_type, equipment_name, date_of_acquisition,
specimen_id, full_genotype, sex, age_in_days,
reporter_line, driver_line
:param ophys_session_ids: optional list of ophys_session_ids to include
:rtype: pd.DataFrame
"""
if not ophys_session_ids:
self.logger.warning("Getting all ophys sessions."
" This might take a while.")
session_query = self._build_in_list_selector_query("os.id",
ophys_session_ids)
query = f"""
SELECT
os.id as ophys_session_id,
bs.id as behavior_session_id,
experiment_ids as ophys_experiment_id,
pr.code as project_code,
os.name as session_name,
os.stimulus_name as session_type,
equipment.name as equipment_name,
os.date_of_acquisition,
os.specimen_id,
g.name as sex,
DATE_PART('day', os.date_of_acquisition - d.date_of_birth)
AS age_in_days,
d.full_genotype,
reporter.reporter_line,
driver.driver_line
FROM ophys_sessions os
JOIN behavior_sessions bs ON os.id = bs.ophys_session_id
JOIN projects pr ON pr.id = os.project_id
JOIN donors d ON d.id = bs.donor_id
JOIN genders g ON g.id = d.gender_id
JOIN (
{self._build_experiment_from_session_query()}
) exp_ids ON os.id = exp_ids.id
JOIN (
{self._build_line_from_donor_query(line="reporter")}
) reporter on reporter.donor_id = d.id
JOIN (
{self._build_line_from_donor_query(line="driver")}
) driver on driver.donor_id = d.id
JOIN equipment ON equipment.id = os.equipment_id
{session_query};
"""
self.logger.debug(f"get_session_table query: \n{query}")
return self.lims_engine.select(query)
[docs] def get_session_table(
self,
ophys_session_ids: Optional[List[int]] = None) -> pd.DataFrame:
"""Return a pd.Dataframe table with all ophys_session_ids and relevant
metadata.
Return columns: ophys_session_id, behavior_session_id,
ophys_experiment_id, project_code, session_name,
session_type, equipment_name, date_of_acquisition,
specimen_id, full_genotype, sex, age_in_days,
reporter_line, driver_line
:param ophys_session_ids: optional list of ophys_session_ids to include
:rtype: pd.DataFrame
"""
# There is one ophys_session_id from 2018 that has multiple behavior
# ids, causing duplicates -- drop all dupes for now; # TODO
table = (self._get_session_table(ophys_session_ids)
.drop_duplicates(subset=["ophys_session_id"], keep=False)
.set_index("ophys_session_id"))
return table
[docs] def get_behavior_only_session_data(
self, behavior_session_id: int) -> BehaviorDataSession:
"""Returns a BehaviorDataSession object that contains methods to
analyze a single behavior session.
:param behavior_session_id: id that corresponds to a behavior session
:type behavior_session_id: int
:rtype: BehaviorDataSession
"""
return BehaviorDataSession(BehaviorDataLimsApi(behavior_session_id))
[docs] def get_experiment_table(
self,
ophys_experiment_ids: Optional[List[int]] = None) -> pd.DataFrame:
"""Return a pd.Dataframe table with all ophys_experiment_ids and
relevant metadata. This is the most specific and most informative
level to examine the data.
Return columns:
ophys_experiment_id, ophys_session_id, behavior_session_id,
container_id, project_code, container_workflow_state,
experiment_workflow_state, session_name, session_type,
equipment_name, date_of_acquisition, isi_experiment_id,
specimen_id, sex, age_in_days, full_genotype, reporter_line,
driver_line, imaging_depth, targeted_structure, published_at
:param ophys_experiment_ids: optional list of ophys_experiment_ids
to include
:rtype: pd.DataFrame
"""
return self._get_experiment_table().set_index("ophys_experiment_id")
[docs] def get_behavior_only_session_table(
self,
behavior_session_ids: Optional[List[int]] = None) -> pd.DataFrame:
"""Returns a pd.DataFrame table with all behavior session_ids to the
user with additional metadata.
Can't return age at time of session because there is no field for
acquisition date for behavior sessions (only in the stimulus pkl file)
:rtype: pd.DataFrame
"""
self.logger.warning("Getting behavior-only session data. "
"This might take a while...")
session_query = self._build_in_list_selector_query(
"bs.id", behavior_session_ids)
summary_tbl = self._get_behavior_summary_table(session_query)
stimulus_names = self._get_behavior_stage_table(behavior_session_ids)
return (summary_tbl.merge(stimulus_names,
on=["foraging_id"], how="left")
.set_index("behavior_session_id"))
[docs] def get_natural_movie_template(self, number: int) -> Iterable[bytes]:
"""Download a template for the natural scene stimulus. This is the
actual image that was shown during the recording session.
:param number: idenfifier for this movie (note that this is an int,
so to get the template for natural_movie_three should pass 3)
:type number: int
:returns: iterable yielding a tiff file as bytes
"""
raise NotImplementedError()
[docs] def get_natural_scene_template(self, number: int) -> Iterable[bytes]:
""" Download a template for the natural movie stimulus. This is the
actual movie that was shown during the recording session.
:param number: identifier for this scene
:type number: int
:returns: An iterable yielding an npy file as bytes
"""
raise NotImplementedError()