import functools
from pathlib import Path
import warnings
import os
import logging
from typing import overload, Callable, Any, Union, Optional, TypeVar
from allensdk.config.manifest import Manifest
P = TypeVar("P")
Q = TypeVar("Q")
AnyPath = Union[Path, str]
@overload
def call_caching(
fetch: Callable[[], Q],
write: Callable[[Q], None],
read: Callable[[], P],
pre_write: Optional[Callable[[Q], Q]] = None,
cleanup: Optional[Callable[[], None]] = None,
lazy: bool = True,
num_tries: int = 1,
failure_message: str = ""
) -> P:
""" Case where a reader is provided
"""
@overload
def call_caching(
fetch: Callable[[], Q],
write: Callable[[Q], None],
read: None = None,
pre_write: Optional[Callable[[Q], Q]] = None,
cleanup: Optional[Callable[[], None]] = None,
lazy: bool = True,
num_tries: int = 1,
failure_message: str = ""
) -> None:
""" Case where no reader is provided (fetches and writes, but returns nothing)
"""
[docs]def call_caching(
fetch: Callable[[], Q],
write: Callable[[Q], None],
read: Optional[Callable[[], P]] = None,
pre_write: Optional[Callable[[Q], Q]] = None,
cleanup: Optional[Callable[[], None]] = None,
lazy: bool = True,
num_tries: int = 1,
failure_message: str = ""
) -> Optional[P]:
""" Access data, caching on a local store for future accesses.
Parameters
----------
fetch :
Function which pulls data from a remote/expensive source.
write :
Function which stores data in a local/inexpensive store.
read :
Function which pulls data from a local/inexpensive store.
pre_write :
Function applied to obtained data after fetching, but before writing.
cleanup :
Function for fixing a failed fetch. e.g. unlinking a partially
downloaded file. Exceptions raised by cleanup are not themselves
handled
lazy :
If True, attempt to read the data from the local/inexpensive store
before fetching it. If False, forcibly fetch from the
remote/expensive store.
num_tries :
How many fetches to attempt before (re)raising an exception. A fetch
is failed if reading the result raises an exception.
failure_message :
Provides additional context in the event of a failed download. Emitted
when retrying, and when a fetch failure occurs after tries are
exhausted
Returns
-------
The result of calling read
"""
logger = logging.getLogger("call_caching")
try:
if not lazy or read is None:
logger.info("Fetching data from remote")
data = fetch()
if pre_write is not None:
data = pre_write(data)
logger.info("Writing data to cache")
write(data)
if read is not None:
logger.info("Reading data from cache")
return read()
except Exception as e:
if isinstance(e, FileNotFoundError):
logger.info("No cache file found.")
# Pandas throws ValueError rather than FileNotFoundError
elif (isinstance(e, ValueError)
and str(e) == "Expected object or value"):
logger.info("No cache file found.")
if cleanup is not None and not lazy:
cleanup()
num_tries -= 1 - lazy # don't count fetchless reads
if num_tries <= 0:
if failure_message:
warnings.warn(failure_message)
raise
retry_message = f"retrying fetch ({num_tries} tries remaining)"
if failure_message:
retry_message = f"{failure_message} {retry_message}"
if not lazy:
warnings.warn(retry_message)
return call_caching(
fetch,
write,
read,
pre_write=pre_write,
cleanup=cleanup,
lazy=False,
num_tries=num_tries,
failure_message=failure_message,
)
return None # required by mypy
[docs]def one_file_call_caching(
path: AnyPath,
fetch: Callable[[], Q],
write: Callable[[AnyPath, Q], None],
read: Optional[Callable[[AnyPath], P]] = None,
pre_write: Optional[Callable[[Q], Q]] = None,
cleanup: Optional[Callable[[], None]] = None,
lazy: bool = True,
num_tries: int = 1,
failure_message: str = "",
) -> Optional[P]:
""" A call_caching variant where the local store is a single file. See
call_caching for complete documentation.
Parameters
----------
path :
Path at which the data will be stored
"""
def safe_unlink():
try:
os.unlink(path)
except IOError:
pass
def safe_write(data: Q):
Manifest.safe_make_parent_dirs(path)
write(path, data)
if read is not None:
read = functools.partial(read, path)
if cleanup is None:
cleanup = safe_unlink
return call_caching(
fetch,
safe_write,
read,
pre_write=pre_write,
cleanup=cleanup,
lazy=lazy,
num_tries=num_tries,
failure_message=failure_message,
)