Source code for allensdk.api.warehouse_cache.caching_utilities

import functools
from pathlib import Path
import warnings
import os
import logging

from typing import overload, Callable, Any, Union, Optional, TypeVar

from allensdk.config.manifest import Manifest


P = TypeVar("P")
Q = TypeVar("Q")

AnyPath = Union[Path, str]


@overload
def call_caching(
    fetch: Callable[[], Q],
    write: Callable[[Q], None],
    read: Callable[[], P],
    pre_write: Optional[Callable[[Q], Q]] = None,
    cleanup: Optional[Callable[[], None]] = None,
    lazy: bool = True,
    num_tries: int = 1,
    failure_message: str = ""
) -> P:
    """ Case where a reader is provided
    """


@overload
def call_caching(
    fetch: Callable[[], Q],
    write: Callable[[Q], None],
    read: None = None,
    pre_write: Optional[Callable[[Q], Q]] = None,
    cleanup: Optional[Callable[[], None]] = None,
    lazy: bool = True,
    num_tries: int = 1,
    failure_message: str = ""
) -> None:
    """ Case where no reader is provided (fetches and writes, but returns nothing)
    """


[docs]def call_caching( fetch: Callable[[], Q], write: Callable[[Q], None], read: Optional[Callable[[], P]] = None, pre_write: Optional[Callable[[Q], Q]] = None, cleanup: Optional[Callable[[], None]] = None, lazy: bool = True, num_tries: int = 1, failure_message: str = "" ) -> Optional[P]: """ Access data, caching on a local store for future accesses. Parameters ---------- fetch : Function which pulls data from a remote/expensive source. write : Function which stores data in a local/inexpensive store. read : Function which pulls data from a local/inexpensive store. pre_write : Function applied to obtained data after fetching, but before writing. cleanup : Function for fixing a failed fetch. e.g. unlinking a partially downloaded file. Exceptions raised by cleanup are not themselves handled lazy : If True, attempt to read the data from the local/inexpensive store before fetching it. If False, forcibly fetch from the remote/expensive store. num_tries : How many fetches to attempt before (re)raising an exception. A fetch is failed if reading the result raises an exception. failure_message : Provides additional context in the event of a failed download. Emitted when retrying, and when a fetch failure occurs after tries are exhausted Returns ------- The result of calling read """ logger = logging.getLogger("call_caching") try: if not lazy or read is None: logger.info("Fetching data from remote") data = fetch() if pre_write is not None: data = pre_write(data) logger.info("Writing data to cache") write(data) if read is not None: logger.info("Reading data from cache") return read() except Exception as e: if isinstance(e, FileNotFoundError): logger.info("No cache file found.") # Pandas throws ValueError rather than FileNotFoundError elif (isinstance(e, ValueError) and str(e) == "Expected object or value"): logger.info("No cache file found.") if cleanup is not None and not lazy: cleanup() num_tries -= 1 - lazy # don't count fetchless reads if num_tries <= 0: if failure_message: warnings.warn(failure_message) raise retry_message = f"retrying fetch ({num_tries} tries remaining)" if failure_message: retry_message = f"{failure_message} {retry_message}" if not lazy: warnings.warn(retry_message) return call_caching( fetch, write, read, pre_write=pre_write, cleanup=cleanup, lazy=False, num_tries=num_tries, failure_message=failure_message, ) return None # required by mypy
[docs]def one_file_call_caching( path: AnyPath, fetch: Callable[[], Q], write: Callable[[AnyPath, Q], None], read: Optional[Callable[[AnyPath], P]] = None, pre_write: Optional[Callable[[Q], Q]] = None, cleanup: Optional[Callable[[], None]] = None, lazy: bool = True, num_tries: int = 1, failure_message: str = "", ) -> Optional[P]: """ A call_caching variant where the local store is a single file. See call_caching for complete documentation. Parameters ---------- path : Path at which the data will be stored """ def safe_unlink(): try: os.unlink(path) except IOError: pass def safe_write(data: Q): Manifest.safe_make_parent_dirs(path) write(path, data) if read is not None: read = functools.partial(read, path) if cleanup is None: cleanup = safe_unlink return call_caching( fetch, safe_write, read, pre_write=pre_write, cleanup=cleanup, lazy=lazy, num_tries=num_tries, failure_message=failure_message, )