Source code for allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine

import functools
import os
import asyncio
import time
import warnings
import logging
from typing import Optional, Iterable, Callable, AsyncIterator, Awaitable

import requests
import aiohttp
import nest_asyncio

DEFAULT_TIMEOUT = 10 * 60  # seconds
DEFAULT_CHUNKSIZE = 1024 * 10  # bytes

[docs]class HttpEngine: def __init__( self, scheme: str, host: str, timeout: float = DEFAULT_TIMEOUT, chunksize: int = DEFAULT_CHUNKSIZE, **kwargs ): """ Simple tool for making streaming http requests. Parameters ---------- scheme : e.g "http" or "https" host : will be used as the base for request urls timeout : requests taking longer than this (in seconds) will raise a `requests.Timeout` error. The clock on this timeout starts running when the initial request is made. chunksize : When streaming data, how many bytes ought to be requested at once. **kwargs : unused. Defined here so that parameters can fall through from subclasses """ self.scheme = scheme = host self.timeout = timeout self.chunksize = chunksize def _build_url(self, route): return f"{self.scheme}://{}/{route}"
[docs] def stream(self, route): """ Makes an http request and returns an iterator over the response. Parameters ---------- route : the http route (under this object's host) to request against. """ url = self._build_url(route) start_time = time.perf_counter() response = requests.get(url, stream=True) response_mb = None if "Content-length" in response.headers: response_mb = float(response.headers["Content-length"]) / 1024 ** 2 for ii, chunk in enumerate(response.iter_content(self.chunksize)): if ii == 0: size_message = f"{response_mb:3.3}mb" if response_mb is not None else "potentially large" logging.warning(f"downloading a {size_message} file from {url}") yield chunk elapsed = time.perf_counter() - start_time if elapsed > self.timeout: raise requests.Timeout(f"Download took {elapsed} seconds, but timeout was set to {self.timeout}")
[docs] @staticmethod def write_bytes(path: str, stream: Iterable[bytes]): write_from_stream(path, stream)
AsyncStreamCallbackType = Callable[[AsyncIterator[bytes]], Awaitable[None]]
[docs]class AsyncHttpEngine(HttpEngine): def __init__( self, scheme: str, host: str, session: Optional[aiohttp.ClientSession] = None, **kwargs ): """ Simple tool for making asynchronous streaming http requests. Parameters ---------- scheme : e.g "http" or "https" host : will be used as the base for request urls session : If provided, this preconstructed session will be used rather than a new one. Keep in mind that AsyncHttpEngine closes its session when it is garbage collected! **kwargs : Will be passed to parent. """ super(AsyncHttpEngine, self).__init__(scheme, host, **kwargs) if session: self.session = session warnings.warn( "Recieved preconstructed session, ignoring timeout parameter." ) else: self.session = aiohttp.ClientSession( timeout=aiohttp.client.ClientTimeout(self.timeout) ) async def _stream_coroutine( self, route: str, callback: AsyncStreamCallbackType ): url = self._build_url(route) async with self.session.get(url) as response: await callback(response.content.iter_chunked(self.chunksize))
[docs] def stream( self, route: str ) -> Callable[[AsyncStreamCallbackType], Awaitable[None]]: """ Returns a coroutine which - makes an http request - exposes internally an asynchronous iterator over the response - takes a callback parameter, which should consume the iterator. Parameters ---------- route : the http route (under this object's host) to request against. Notes ----- To use this method, you will need an appropriate consumer. For instance, If you want to write the streamed data to a local file, you can use write_bytes_from_coroutine. Examples -------- >>> engine = AsyncHttpEngine("http", "examplehost") >>> stream_coro ="example/route") >>> write_bytes_from_coroutine("example/file/path.txt", stream_coro) """ return functools.partial(self._stream_coroutine, route)
def __del__(self): if hasattr(self, "session"): nest_asyncio.apply() loop = asyncio.get_event_loop() loop.run_until_complete(self.session.close())
[docs] @staticmethod def write_bytes( path: str, coroutine: Callable[[AsyncStreamCallbackType], Awaitable[None]]): write_bytes_from_coroutine(path, coroutine)
[docs]def write_bytes_from_coroutine( path: str, coroutine: Callable[[AsyncStreamCallbackType], Awaitable[None]] ): """ Utility for streaming http from an asynchronous requester to a file. Parameters ---------- path : Write to this file coroutine : The source of the data. Needs to have a specific structure, namely: - the first-position parameter of the coroutine ought to accept a callback. This callback ought to itself be awaitable. - within the coroutine, this callback ought to be called with a single argument. That single argument should be an asynchronous iterator. Please see (and AsyncHttpEngine._stream_coroutine) for an example. """ os.makedirs(os.path.dirname(path), exist_ok=True) async def callback(file_, iterable): async for chunk in iterable: file_.write(chunk) async def wrapper(): with open(path, "wb") as file_: callback_ = functools.partial(callback, file_) await coroutine(callback_) nest_asyncio.apply() loop = asyncio.get_event_loop() loop.run_until_complete(wrapper())
[docs]def write_from_stream(path: str, stream: Iterable[bytes]): """ Write bytes to a file from an iterator Parameters ---------- path : write to this file stream : iterable yielding bytes to be written """ with open(path, "wb") as fil: for chunk in stream: fil.write(chunk)