Source code for allensdk.brain_observatory.ecephys.ecephys_project_api.rma_engine
import sys
import logging
import time
import ast
import requests
import pandas as pd
from .http_engine import HttpEngine, AsyncHttpEngine
[docs]class RmaRequestError(Exception):
pass
[docs]class RmaEngine(HttpEngine):
@property
def format_query_string(self):
return f"query.{self.rma_format}"
def __init__(
self,
scheme,
host,
rma_prefix: str = "api/v2/data",
rma_format: str = "json",
page_size: int = 5000,
**kwargs
):
""" Simple tool for making rma and streaming http requests.
Parameters
----------
scheme :
e.g "http" or "https"
host :
will be used as the base for request urls
rma_prefix :
rma request routes will be prefixed with this string
rma_format :
Format of reuturned response. e.g. "json", "xml", "csv"
page_size :
how many rma records to request in one query.
**kwargs :
will be passed to parent
"""
super(RmaEngine, self).__init__(scheme, host, **kwargs)
self.rma_prefix = rma_prefix
self.rma_format = rma_format
self.page_size = page_size
[docs] def add_page_params(self, url, start, count=None):
if count is None:
count = self.page_size
return f"{url},rma::options[start_row$eq{start}][num_rows$eq{count}][order$eq'id']"
[docs] def get_rma(self, query: str):
""" Makes a paging rma query
Parameters
----------
query :
The RMA query parameters
"""
url = f"{self.scheme}://{self.host}/{self.rma_prefix}/{self.format_query_string}?{query}"
logging.debug(url)
start_row = 0
total_rows = None
start_time = time.time()
while total_rows is None or start_row < total_rows:
current_url = self.add_page_params(url, start_row)
response_json = requests.get(current_url).json()
if not response_json["success"]:
raise RmaRequestError(response_json["msg"])
start_row += response_json["num_rows"]
if total_rows is None:
total_rows = response_json["total_rows"]
logging.debug(f"downloaded {start_row} of {total_rows} records ({time.time() - start_time:.3f} seconds)")
yield response_json["msg"]
[docs] def get_rma_list(self, query):
response = []
for chunk in self.get_rma(query):
response.extend(chunk)
return response
[docs] def get_rma_tabular(self, query, try_infer_dtypes=True):
response = pd.DataFrame(self.get_rma_list(query))
if try_infer_dtypes:
response = infer_column_types(response)
return response
[docs]class AsyncRmaEngine(RmaEngine, AsyncHttpEngine):
def __init__(self, scheme: str, host: str, **kwargs):
""" Simple tool for making rma and asynchronous streaming http
requests.
Parameters
----------
scheme :
e.g "http" or "https"
host :
will be used as the base for request urls
**kwargs :
will be passed to parent
"""
super(AsyncRmaEngine, self).__init__(scheme, host, **kwargs)
[docs]def infer_column_types(dataframe):
""" RMA queries often come back with string-typed columns. This utility tries to infer numeric types.
"""
dataframe = dataframe.copy()
for colname in dataframe.columns:
try:
dataframe[colname] = dataframe[colname].apply(ast.literal_eval)
except (ValueError, SyntaxError):
continue
dataframe = dataframe.infer_objects()
return dataframe