import functools
import os
import asyncio
import time
import warnings
import logging
from typing import Optional, Iterable, Callable, AsyncIterator, Awaitable
import requests
import aiohttp
import nest_asyncio
DEFAULT_TIMEOUT = 10 * 60 # seconds
DEFAULT_CHUNKSIZE = 1024 * 10 # bytes
[docs]class HttpEngine:
def __init__(
self,
scheme: str,
host: str,
timeout: float = DEFAULT_TIMEOUT,
chunksize: int = DEFAULT_CHUNKSIZE,
**kwargs
):
""" Simple tool for making streaming http requests.
Parameters
----------
scheme :
e.g "http" or "https"
host :
will be used as the base for request urls
timeout :
requests taking longer than this (in seconds) will raise a
`requests.Timeout` error. The clock on this timeout starts running
when the initial request is made.
chunksize :
When streaming data, how many bytes ought to be requested at once.
**kwargs :
unused. Defined here so that parameters can fall through from
subclasses
"""
self.scheme = scheme
self.host = host
self.timeout = timeout
self.chunksize = chunksize
def _build_url(self, route):
return f"{self.scheme}://{self.host}/{route}"
[docs] def stream(self, route):
""" Makes an http request and returns an iterator over the response.
Parameters
----------
route :
the http route (under this object's host) to request against.
"""
url = self._build_url(route)
start_time = time.perf_counter()
response = requests.get(url, stream=True)
response_mb = None
if "Content-length" in response.headers:
response_mb = float(response.headers["Content-length"]) / 1024 ** 2
for ii, chunk in enumerate(response.iter_content(self.chunksize)):
if ii == 0:
size_message = f"{response_mb:3.3}mb" if response_mb is not None else "potentially large"
logging.warning(f"downloading a {size_message} file from {url}")
yield chunk
elapsed = time.perf_counter() - start_time
if elapsed > self.timeout:
raise requests.Timeout(f"Download took {elapsed} seconds, but timeout was set to {self.timeout}")
AsyncStreamCallbackType = Callable[[AsyncIterator[bytes]], Awaitable[None]]
[docs]class AsyncHttpEngine(HttpEngine):
def __init__(
self,
scheme: str,
host: str,
session: Optional[aiohttp.ClientSession] = None,
**kwargs
):
""" Simple tool for making asynchronous streaming http requests.
Parameters
----------
scheme :
e.g "http" or "https"
host :
will be used as the base for request urls
session :
If provided, this preconstructed session will be used rather than
a new one. Keep in mind that AsyncHttpEngine closes its session
when it is garbage collected!
**kwargs :
Will be passed to parent.
"""
super(AsyncHttpEngine, self).__init__(scheme, host, **kwargs)
if session:
self.session = session
warnings.warn(
"Recieved preconstructed session, ignoring timeout parameter."
)
else:
self.session = aiohttp.ClientSession(
timeout=aiohttp.client.ClientTimeout(self.timeout)
)
async def _stream_coroutine(
self,
route: str,
callback: AsyncStreamCallbackType
):
url = self._build_url(route)
async with self.session.get(url) as response:
await callback(response.content.iter_chunked(self.chunksize))
[docs] def stream(
self,
route: str
) -> Callable[[AsyncStreamCallbackType], Awaitable[None]]:
""" Returns a coroutine which
- makes an http request
- exposes internally an asynchronous iterator over the response
- takes a callback parameter, which should consume the iterator.
Parameters
----------
route :
the http route (under this object's host) to request against.
Notes
-----
To use this method, you will need an appropriate consumer. For
instance, If you want to write the streamed data to a local file, you
can use write_bytes_from_coroutine.
Examples
--------
>>> engine = AsyncHttpEngine("http", "examplehost")
>>> stream_coro = engine.stream("example/route")
>>> write_bytes_from_coroutine("example/file/path.txt", stream_coro)
"""
return functools.partial(self._stream_coroutine, route)
def __del__(self):
if hasattr(self, "session"):
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(self.session.close())
[docs]def write_bytes_from_coroutine(
path: str,
coroutine: Callable[[AsyncStreamCallbackType], Awaitable[None]]
):
""" Utility for streaming http from an asynchronous requester to a file.
Parameters
----------
path :
Write to this file
coroutine :
The source of the data. Needs to have a specific structure, namely:
- the first-position parameter of the coroutine ought to accept a
callback. This callback ought to itself be awaitable.
- within the coroutine, this callback ought to be called with a
single argument. That single argument should be an asynchronous
iterator.
Please see AsyncHttpEngine.stream (and
AsyncHttpEngine._stream_coroutine) for an example.
"""
os.makedirs(os.path.dirname(path), exist_ok=True)
async def callback(file_, iterable):
async for chunk in iterable:
file_.write(chunk)
async def wrapper():
with open(path, "wb") as file_:
callback_ = functools.partial(callback, file_)
await coroutine(callback_)
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(wrapper())
[docs]def write_from_stream(path: str, stream: Iterable[bytes]):
""" Write bytes to a file from an iterator
Parameters
----------
path :
write to this file
stream :
iterable yielding bytes to be written
"""
with open(path, "wb") as fil:
for chunk in stream:
fil.write(chunk)