from typing import Optional, Iterable, NamedTuple
import pandas as pd
from .ecephys_project_api import EcephysProjectApi, ArrayLike
from .http_engine import HttpEngine, AsyncHttpEngine
from .utilities import postgres_macros, build_and_execute
from allensdk.internal.api import PostgresQueryMixin
from allensdk.core.authentication import credential_injector, DbCredentials
from allensdk.core.auth_config import LIMS_DB_CREDENTIAL_MAP
[docs]class EcephysProjectLimsApi(EcephysProjectApi):
STIMULUS_TEMPLATE_NAMESPACE = "brain_observatory_1.1"
def __init__(self, postgres_engine, app_engine):
""" Downloads extracellular ephys data from the Allen Institute's
internal Laboratory Information Management System (LIMS). If you are
on our network you can use this class to get bleeding-edge data into
an EcephysProjectCache. If not, it won't work at all
Parameters
----------
postgres_engine :
used for making queries against the LIMS postgres database. Must
implement:
select : takes a postgres query as a string. Returns a pandas
dataframe of results
select_one : takes a postgres query as a string. If there is
exactly one record in the response, returns that record as
a dict. Otherwise returns an empty dict.
app_engine :
used for making queries agains the lims web application. Must
implement:
stream : takes a url as a string. Returns an iterable yielding
the response body as bytes.
Notes
-----
You almost certainly want to construct this class by calling
EcephysProjectLimsApi.default() rather than this constructor directly.
"""
self.postgres_engine = postgres_engine
self.app_engine = app_engine
[docs] def get_session_data(self, session_id: int) -> Iterable[bytes]:
""" Download an NWB file containing detailed data for an ecephys
session.
Parameters
----------
session_id :
Download an NWB file for this session
Returns
-------
An iterable yielding an NWB file as bytes.
"""
nwb_response = build_and_execute(
"""
select wkf.id, wkf.filename, wkf.storage_directory, wkf.attachable_id from well_known_files wkf
join ecephys_analysis_runs ear on (
ear.id = wkf.attachable_id
and wkf.attachable_type = 'EcephysAnalysisRun'
)
join well_known_file_types wkft on wkft.id = wkf.well_known_file_type_id
where ear.current
and wkft.name = 'EcephysNwb'
and ear.ecephys_session_id = {{session_id}}
""",
engine=self.postgres_engine.select,
session_id=session_id,
)
if nwb_response.shape[0] != 1:
raise ValueError(
f"expected exactly 1 current NWB file for session {session_id}, "
f"found {nwb_response.shape[0]}: {pd.DataFrame(nwb_response)}"
)
nwb_id = nwb_response.loc[0, "id"]
return self.app_engine.stream(
f"well_known_files/download/{nwb_id}?wkf_id={nwb_id}"
)
[docs] def get_probe_lfp_data(self, probe_id: int) -> Iterable[bytes]:
""" Download an NWB file containing detailed data for the local field
potential recorded from an ecephys probe.
Parameters
----------
probe_id :
Download an NWB file for this probe's LFP
Returns
-------
An iterable yielding an NWB file as bytes.
"""
nwb_response = build_and_execute(
"""
select wkf.id from well_known_files wkf
join ecephys_analysis_run_probes earp on (
earp.id = wkf.attachable_id
and wkf.attachable_type = 'EcephysAnalysisRunProbe'
)
join ecephys_analysis_runs ear on ear.id = earp.ecephys_analysis_run_id
join well_known_file_types wkft on wkft.id = wkf.well_known_file_type_id
where wkft.name ~ 'EcephysLfpNwb'
and ear.current
and earp.ecephys_probe_id = {{probe_id}}
""",
engine=self.postgres_engine.select,
probe_id=probe_id
)
if nwb_response.shape[0] != 1:
raise ValueError(
f"expected exactly 1 current LFP NWB file for probe {probe_id}, "
f"found {nwb_response.shape[0]}: {pd.DataFrame(nwb_response)}"
)
nwb_id = nwb_response.loc[0, "id"]
return self.app_engine.stream(
f"well_known_files/download/{nwb_id}?wkf_id={nwb_id}"
)
[docs] def get_units(
self,
unit_ids: Optional[ArrayLike] = None,
channel_ids: Optional[ArrayLike] = None,
probe_ids: Optional[ArrayLike] = None,
session_ids: Optional[ArrayLike] = None,
published_at: Optional[str] = None
) -> pd.DataFrame:
""" Download a table of records describing sorted ecephys units.
Parameters
----------
unit_ids :
A collection of integer identifiers for sorted ecephys units. If
provided, only return records describing these units.
channel_ids :
A collection of integer identifiers for ecephys channels. If
provided, results will be filtered to units recorded from these
channels.
probe_ids :
A collection of integer identifiers for ecephys probes. If
provided, results will be filtered to units recorded from these
probes.
session_ids :
A collection of integer identifiers for ecephys sessions. If
provided, results will be filtered to units recorded during
these sessions.
published_at :
A date (rendered as "YYYY-MM-DD"). If provided, only units
recorded during sessions published before this date will be
returned.
Returns
-------
a pd.DataFrame whose rows are ecephys channels.
"""
response = build_and_execute(
"""
{%- import 'postgres_macros' as pm -%}
{%- import 'macros' as m -%}
select
eu.id,
eu.ecephys_channel_id,
eu.quality,
eu.snr,
eu.firing_rate,
eu.isi_violations,
eu.presence_ratio,
eu.amplitude_cutoff,
eu.isolation_distance,
eu.l_ratio,
eu.d_prime,
eu.nn_hit_rate,
eu.nn_miss_rate,
eu.silhouette_score,
eu.max_drift,
eu.cumulative_drift,
eu.epoch_name_quality_metrics,
eu.epoch_name_waveform_metrics,
eu.duration,
eu.halfwidth,
eu.\"PT_ratio\",
eu.repolarization_slope,
eu.recovery_slope,
eu.amplitude,
eu.spread,
eu.velocity_above,
eu.velocity_below
from ecephys_units eu
join ecephys_channels ec on ec.id = eu.ecephys_channel_id
join ecephys_probes ep on ep.id = ec.ecephys_probe_id
join ecephys_sessions es on es.id = ep.ecephys_session_id
where
not es.habituation
and ec.valid_data
and ep.workflow_state != 'failed'
and es.workflow_state != 'failed'
{{pm.optional_not_null('es.published_at', published_at_not_null)}}
{{pm.optional_le('es.published_at', published_at)}}
{{pm.optional_contains('eu.id', unit_ids) -}}
{{pm.optional_contains('ec.id', channel_ids) -}}
{{pm.optional_contains('ep.id', probe_ids) -}}
{{pm.optional_contains('es.id', session_ids) -}}
""",
base=postgres_macros(),
engine=self.postgres_engine.select,
unit_ids=unit_ids,
channel_ids=channel_ids,
probe_ids=probe_ids,
session_ids=session_ids,
**_split_published_at(published_at)._asdict()
)
return response.set_index("id", inplace=False)
[docs] def get_channels(
self,
channel_ids: Optional[ArrayLike] = None,
probe_ids: Optional[ArrayLike] = None,
session_ids: Optional[ArrayLike] = None,
published_at: Optional[str] = None
) -> pd.DataFrame:
""" Download a table of ecephys channel records.
Parameters
----------
channel_ids :
A collection of integer identifiers for ecephys channels. If
provided, results will be filtered to these channels.
probe_ids :
A collection of integer identifiers for ecephys probes. If
provided, results will be filtered to channels on these probes.
session_ids :
A collection of integer identifiers for ecephys sessions. If
provided, results will be filtered to channels recorded from during
these sessions.
published_at :
A date (rendered as "YYYY-MM-DD"). If provided, only channels
recorded from during sessions published before this date will be
returned.
Returns
-------
a pd.DataFrame whose rows are ecephys channels.
"""
response = build_and_execute(
"""
{%- import 'postgres_macros' as pm -%}
select
ec.id,
ec.ecephys_probe_id,
ec.local_index,
ec.probe_vertical_position,
ec.probe_horizontal_position,
ec.manual_structure_id as ecephys_structure_id,
st.acronym as ecephys_structure_acronym,
ec.anterior_posterior_ccf_coordinate,
ec.dorsal_ventral_ccf_coordinate,
ec.left_right_ccf_coordinate
from ecephys_channels ec
join ecephys_probes ep on ep.id = ec.ecephys_probe_id
join ecephys_sessions es on es.id = ep.ecephys_session_id
left join structures st on ec.manual_structure_id = st.id
where
not es.habituation
and valid_data
and ep.workflow_state != 'failed'
and es.workflow_state != 'failed'
{{pm.optional_not_null('es.published_at', published_at_not_null)}}
{{pm.optional_le('es.published_at', published_at)}}
{{pm.optional_contains('ec.id', channel_ids) -}}
{{pm.optional_contains('ep.id', probe_ids) -}}
{{pm.optional_contains('es.id', session_ids) -}}
""",
base=postgres_macros(),
engine=self.postgres_engine.select,
channel_ids=channel_ids,
probe_ids=probe_ids,
session_ids=session_ids,
**_split_published_at(published_at)._asdict()
)
return response.set_index("id")
[docs] def get_probes(
self,
probe_ids: Optional[ArrayLike] = None,
session_ids: Optional[ArrayLike] = None,
published_at: Optional[str] = None
) -> pd.DataFrame:
""" Download a table of ecephys probe records.
Parameters
----------
probe_ids :
A collection of integer identifiers for ecephys probes. If
provided, results will be filtered to these probes.
session_ids :
A collection of integer identifiers for ecephys sessions. If
provided, results will be filtered to probes recorded from during
these sessions.
published_at :
A date (rendered as "YYYY-MM-DD"). If provided, only probes
recorded from during sessions published before this date will be
returned.
Returns
-------
a pd.DataFrame whose rows are ecephys probes.
"""
response = build_and_execute(
"""
{%- import 'postgres_macros' as pm -%}
select
ep.id,
ep.ecephys_session_id,
ep.name,
ep.global_probe_sampling_rate as sampling_rate,
ep.global_probe_lfp_sampling_rate as lfp_sampling_rate,
ep.phase,
ep.air_channel_index,
ep.surface_channel_index,
ep.use_lfp_data as has_lfp_data,
ep.temporal_subsampling_factor as lfp_temporal_subsampling_factor
from ecephys_probes ep
join ecephys_sessions es on es.id = ep.ecephys_session_id
where
not es.habituation
and ep.workflow_state != 'failed'
and es.workflow_state != 'failed'
{{pm.optional_not_null('es.published_at', published_at_not_null)}}
{{pm.optional_le('es.published_at', published_at)}}
{{pm.optional_contains('ep.id', probe_ids) -}}
{{pm.optional_contains('es.id', session_ids) -}}
""",
base=postgres_macros(),
engine=self.postgres_engine.select,
probe_ids=probe_ids,
session_ids=session_ids,
**_split_published_at(published_at)._asdict()
)
return response.set_index("id")
[docs] def get_sessions(
self,
session_ids: Optional[ArrayLike] = None,
published_at: Optional[str] = None
) -> pd.DataFrame:
""" Download a table of ecephys session records.
Parameters
----------
session_ids :
A collection of integer identifiers for ecephys sessions. If
provided, results will be filtered to these sessions.
published_at :
A date (rendered as "YYYY-MM-DD"). If provided, only sessions
published before this date will be returned.
Returns
-------
a pd.DataFrame whose rows are ecephys sessions.
"""
response = build_and_execute(
"""
{%- import 'postgres_macros' as pm -%}
{%- import 'macros' as m -%}
select
es.id,
es.specimen_id,
es.stimulus_name as session_type,
es.isi_experiment_id,
es.date_of_acquisition,
es.published_at,
dn.full_genotype as genotype,
gd.name as sex,
ages.days as age_in_days,
case
when nwb_id is not null then true
else false
end as has_nwb
from ecephys_sessions es
join specimens sp on sp.id = es.specimen_id
join donors dn on dn.id = sp.donor_id
join genders gd on gd.id = dn.gender_id
join ages on ages.id = dn.age_id
left join (
select ecephys_sessions.id as ecephys_session_id,
wkf.id as nwb_id
from ecephys_sessions
join ecephys_analysis_runs ear on (
ear.ecephys_session_id = ecephys_sessions.id
and ear.current
)
join well_known_files wkf on (
wkf.attachable_id = ear.id
and wkf.attachable_type = 'EcephysAnalysisRun'
)
join well_known_file_types wkft on wkft.id = wkf.well_known_file_type_id
where wkft.name = 'EcephysNwb'
) nwb on es.id = nwb.ecephys_session_id
where
not es.habituation
and es.workflow_state != 'failed'
{{pm.optional_contains('es.id', session_ids) -}}
{{pm.optional_not_null('es.published_at', published_at_not_null)}}
{{pm.optional_le('es.published_at', published_at)}}
""",
base=postgres_macros(),
engine=self.postgres_engine.select,
session_ids=session_ids,
**_split_published_at(published_at)._asdict()
)
response.set_index("id", inplace=True)
response["genotype"].fillna("wt", inplace=True)
return response
[docs] def get_unit_analysis_metrics(
self,
unit_ids: Optional[ArrayLike] = None,
ecephys_session_ids: Optional[ArrayLike] = None,
session_types: Optional[ArrayLike] = None
) -> pd.DataFrame:
""" Fetch analysis metrics (stimulus set-specific characterizations of
unit response patterns) for ecephys units. Note that the metrics
returned depend on the stimuli that were presented during recording (
and thus on the session_type)
Parameters
---------
unit_ids :
integer identifiers for a set of ecephys units. If provided, the
response will only include metrics calculated for these units
ecephys_session_ids :
integer identifiers for a set of ecephys sessions. If provided, the
response will only include metrics calculated for units identified
during these sessions
session_types :
string names identifying ecephys session types (e.g.
"brain_observatory_1.1" or "functional_connectivity")
Returns
-------
a pandas dataframe indexed by ecephys unit id whose columns are
metrics.
"""
response = build_and_execute(
"""
{%- import 'postgres_macros' as pm -%}
{%- import 'macros' as m -%}
select eumb.data, eumb.ecephys_unit_id from ecephys_unit_metric_bundles eumb
join ecephys_analysis_runs ear on eumb.ecephys_analysis_run_id = ear.id
join ecephys_units eu on eumb.ecephys_unit_id = eu.id
join ecephys_channels ec on eu.ecephys_channel_id = ec.id
join ecephys_probes ep on ec.ecephys_probe_id = ep.id
join ecephys_sessions es on es.id = ep.ecephys_session_id
where ear.current
{{pm.optional_contains('eumb.id', unit_ids) -}}
{{pm.optional_contains('es.id', ecephys_session_ids) -}}
{{pm.optional_contains('es.stimulus_name', session_types, True) -}}
""",
base=postgres_macros(),
engine=self.postgres_engine.select,
unit_ids=unit_ids,
ecephys_session_ids=ecephys_session_ids,
session_types=session_types
)
data = pd.DataFrame(response.pop("data").values.tolist(), index=response.index)
response = pd.merge(response, data, left_index=True, right_index=True)
response.set_index("ecephys_unit_id", inplace=True)
return response
def _get_template(self, name, namespace):
""" Identify the WellKnownFile record associated with a stimulus
template and stream its data if present.
"""
try:
well_known_file = build_and_execute(
f"""
select
st.well_known_file_id
from stimuli st
join stimulus_namespaces sn on sn.id = st.stimulus_namespace_id
where
st.name = '{name}'
and sn.name = '{namespace}'
""",
base=postgres_macros(),
engine=self.postgres_engine.select_one
)
wkf_id = well_known_file["well_known_file_id"]
except (KeyError, IndexError):
raise ValueError(f"expected exactly 1 template for {name}")
download_link = f"well_known_files/download/{wkf_id}?wkf_id={wkf_id}"
return self.app_engine.stream(download_link)
[docs] def get_natural_movie_template(self, number: int) -> Iterable[bytes]:
""" Download a template for the natural movie stimulus. This is the
actual movie that was shown during the recording session.
Parameters
----------
number :
idenfifier for this movie (note that this is an integer, so to get
the template for natural_movie_three you should pass in 3)
Returns
-------
An iterable yielding an npy file as bytes
"""
return self._get_template(
f"natural_movie_{number}", self.STIMULUS_TEMPLATE_NAMESPACE
)
[docs] def get_natural_scene_template(self, number: int) -> Iterable[bytes]:
""" Download a template for the natural scene stimulus. This is the
actual image that was shown during the recording session.
Parameters
----------
number :
idenfifier for this scene
Returns
-------
An iterable yielding a tiff file as bytes.
"""
return self._get_template(
f"natural_scene_{int(number)}", self.STIMULUS_TEMPLATE_NAMESPACE
)
[docs] @classmethod
def default(cls, lims_credentials: Optional[DbCredentials] = None,
app_kwargs=None, asynchronous=False):
""" Construct a "straightforward" lims api that can fetch data from
lims2.
Parameters
----------
lims_credentials : DbCredentials
Credentials and configuration for postgres queries against
the LIMS database. If left unspecified will attempt to provide
credentials from environment variables.
app_kwargs : dict
High-level configuration for http requests. See
allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine.HttpEngine
and AsyncHttpEngine for details.
asynchronous : bool
If true, (http) queries will be made asynchronously.
Returns
-------
EcephysProjectLimsApi
"""
_app_kwargs = {"scheme": "http", "host": "lims2", "asynchronous": asynchronous}
if app_kwargs is not None:
if "asynchronous" in app_kwargs:
raise TypeError("please specify asynchronicity option at the api level rather than for the http engine")
_app_kwargs.update(app_kwargs)
app_engine_cls = AsyncHttpEngine if _app_kwargs["asynchronous"] else HttpEngine
app_engine = app_engine_cls(**_app_kwargs)
if lims_credentials is not None:
pg_engine = PostgresQueryMixin(
dbname=lims_credentials.dbname, user=lims_credentials.user,
host=lims_credentials.host, password=lims_credentials.password,
port=lims_credentials.port)
else:
# Currying is equivalent to decorator syntactic sugar
pg_engine = (credential_injector(LIMS_DB_CREDENTIAL_MAP)
(PostgresQueryMixin)())
return cls(pg_engine, app_engine)
[docs]class SplitPublishedAt(NamedTuple):
published_at: Optional[str]
published_at_not_null: Optional[bool]
def _split_published_at(published_at: Optional[str]) -> SplitPublishedAt:
""" LIMS queries that filter on published_at need a couple of
reformattings of the argued date string.
"""
return SplitPublishedAt(
published_at=f"'{published_at}'" if published_at is not None else None,
published_at_not_null=None if published_at is None else True
)