Source code for allensdk.brain_observatory.ecephys.ecephys_project_api.rma_engine

import sys
import logging
import time
import ast

import requests
import pandas as pd

from .http_engine import HttpEngine, AsyncHttpEngine


[docs]class RmaRequestError(Exception): pass
[docs]class RmaEngine(HttpEngine): @property def format_query_string(self): return f"query.{self.rma_format}" def __init__( self, scheme, host, rma_prefix: str = "api/v2/data", rma_format: str = "json", page_size: int = 5000, **kwargs ): """ Simple tool for making rma and streaming http requests. Parameters ---------- scheme : e.g "http" or "https" host : will be used as the base for request urls rma_prefix : rma request routes will be prefixed with this string rma_format : Format of reuturned response. e.g. "json", "xml", "csv" page_size : how many rma records to request in one query. **kwargs : will be passed to parent """ super(RmaEngine, self).__init__(scheme, host, **kwargs) self.rma_prefix = rma_prefix self.rma_format = rma_format self.page_size = page_size
[docs] def add_page_params(self, url, start, count=None): if count is None: count = self.page_size return f"{url},rma::options[start_row$eq{start}][num_rows$eq{count}][order$eq'id']"
[docs] def get_rma(self, query: str): """ Makes a paging rma query Parameters ---------- query : The RMA query parameters """ url = f"{self.scheme}://{self.host}/{self.rma_prefix}/{self.format_query_string}?{query}" logging.debug(url) start_row = 0 total_rows = None start_time = time.time() while total_rows is None or start_row < total_rows: current_url = self.add_page_params(url, start_row) response_json = requests.get(current_url).json() if not response_json["success"]: raise RmaRequestError(response_json["msg"]) start_row += response_json["num_rows"] if total_rows is None: total_rows = response_json["total_rows"] logging.debug(f"downloaded {start_row} of {total_rows} records ({time.time() - start_time:.3f} seconds)") yield response_json["msg"]
[docs] def get_rma_list(self, query): response = [] for chunk in self.get_rma(query): response.extend(chunk) return response
[docs] def get_rma_tabular(self, query, try_infer_dtypes=True): response = pd.DataFrame(self.get_rma_list(query)) if try_infer_dtypes: response = infer_column_types(response) return response
[docs]class AsyncRmaEngine(RmaEngine, AsyncHttpEngine): def __init__(self, scheme: str, host: str, **kwargs): """ Simple tool for making rma and asynchronous streaming http requests. Parameters ---------- scheme : e.g "http" or "https" host : will be used as the base for request urls **kwargs : will be passed to parent """ super(AsyncRmaEngine, self).__init__(scheme, host, **kwargs)
[docs]def infer_column_types(dataframe): """ RMA queries often come back with string-typed columns. This utility tries to infer numeric types. """ dataframe = dataframe.copy() for colname in dataframe.columns: try: dataframe[colname] = dataframe[colname].apply(ast.literal_eval) except (ValueError, SyntaxError): continue dataframe = dataframe.infer_objects() return dataframe