Source code for allensdk.api.warehouse_cache.cache

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2015-2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from allensdk.config.manifest import Manifest, ManifestVersionError
from allensdk.config.manifest_builder import ManifestBuilder
import allensdk.core.json_utilities as ju
from allensdk.deprecated import deprecated

import pandas as pd
import pandas.io.json as pj

import functools
from functools import wraps, _make_key
import os
import logging
import csv


[docs]def memoize(f): """ Creates an unbound cache of function calls and results. Note that arguments of different types are not cached separately (so f(3.0) and f(3) are not treated as distinct calls) Arguments to the cached function must be hashable. View the cache size with f.cache_size(). Clear the cache with f.cache_clear(). Access the underlying function with f.__wrapped__. """ cache = {} sentinel = object() # unique object for cache misses make_key = _make_key # efficient key building from function args cache_get = cache.get cache_len = cache.__len__ @wraps(f) def wrapper(*args, **kwargs): # Don't consider 3.0 and 3 different key = make_key(args, kwargs, typed=False) result = cache_get(key, sentinel) if result is not sentinel: return result result = f(*args, **kwargs) cache[key] = result return result def cache_clear(): cache.clear() def cache_size(): return cache_len() wrapper.cache_clear = cache_clear wrapper.cache_size = cache_size return wrapper
[docs]class Cache(object): _log = logging.getLogger('allensdk.api.cache') def __init__(self, manifest=None, cache=True, version=None, **kwargs): self.cache = cache if version is None and hasattr(self, 'MANIFEST_VERSION'): version = self.MANIFEST_VERSION self.load_manifest(manifest, version)
[docs] def get_cache_path(self, file_name, manifest_key, *args): '''Helper method for accessing path specs from manifest keys. Parameters ---------- file_name : string manifest_key : string args : ordered parameters Returns ------- string or None path ''' if self.cache: if file_name: return file_name elif self.manifest: return self.manifest.get_path(manifest_key, *args) return None
[docs] def load_manifest(self, file_name, version=None): '''Read a keyed collection of path specifications. Parameters ---------- file_name : string path to the manifest file Returns ------- Manifest ''' if file_name is not None: if not os.path.exists(file_name): # make the directory if it doesn't exist already dirname = os.path.dirname(file_name) if dirname: Manifest.safe_mkdir(dirname) self.build_manifest(file_name) try: self.manifest = Manifest( ju.read(file_name)['manifest'], os.path.dirname(file_name), version=version) except ManifestVersionError as e: if e.outdated is True: intro = "is out of date" elif e.outdated is False: intro = "was made with a newer version of the AllenSDK" elif e.outdated is None: intro = "version did not match the expected version" ref_url = "https://github.com/alleninstitute/allensdk/wiki" raise ManifestVersionError(("Your manifest file (%s) %s" + " (its version is '%s', but" + " version '%s' is expected). " + " Please remove this file" + " and it will be regenerated for" + " you the next time you" + " instantiate this class." + " WARNING: There may be new data" + " files available that replace" + " the ones you already have" + " downloaded. Read the notes" + " for this release for more" + " details on what has changed" + " (%s).") % (file_name, intro, e.found_version, e.version, ref_url), e.version, e.found_version) self.manifest_path = file_name else: self.manifest = None
[docs] def build_manifest(self, file_name): '''Creation of default path specifications. Parameters ---------- file_name : string where to save it ''' manifest_builder = ManifestBuilder() manifest_builder.set_version(self.MANIFEST_VERSION) manifest_builder = self.add_manifest_paths(manifest_builder) manifest_builder.write_json_file(file_name)
[docs] def add_manifest_paths(self, manifest_builder): '''Add cache-class specific paths to the manifest. In derived classes, should call super. ''' manifest_builder.add_path('BASEDIR', '.') if hasattr(self, 'MANIFEST_CONFIG'): for key, config in self.MANIFEST_CONFIG.items(): manifest_builder.add_path(key, **config) return manifest_builder
[docs] def manifest_dataframe(self): '''Convenience method to view manifest as a pandas dataframe. ''' return pd.DataFrame.from_dict(self.manifest.path_info, orient='index')
[docs] @staticmethod def json_remove_keys(data, keys): for r in data: for key in keys: del r[key] return data
[docs] @staticmethod def remove_keys(data, keys=None): ''' DataFrame version ''' if keys is None: keys = [] for key in keys: del data[key]
[docs] @staticmethod def json_rename_columns(data, new_old_name_tuples=None): '''Convenience method to rename columns in a pandas dataframe. Parameters ---------- data : dataframe edited in place. new_old_name_tuples : list of string tuples (new, old) ''' if new_old_name_tuples is None: new_old_name_tuples = [] for new_name, old_name in new_old_name_tuples: for r in data: r[new_name] = r[old_name] del r[old_name]
[docs] @staticmethod def rename_columns(data, new_old_name_tuples=None): '''Convenience method to rename columns in a pandas dataframe. Parameters ---------- data : dataframe edited in place. new_old_name_tuples : list of string tuples (new, old) ''' if new_old_name_tuples is None: new_old_name_tuples = [] for new_name, old_name in new_old_name_tuples: data.columns = [new_name if c == old_name else c for c in data.columns]
[docs] def load_csv(self, path, rename=None, index=None): '''Read a csv file as a pandas dataframe. Parameters ---------- rename : list of string tuples (new old), optional columns to rename index : string, optional post-rename column to use as the row label. ''' data = pd.read_csv(path, parse_dates=True) Cache.rename_columns(data, rename) if index is not None: data.set_index([index], inplace=True) return data
[docs] def load_json(self, path, rename=None, index=None): '''Read a json file as a pandas dataframe. Parameters ---------- rename : list of string tuples (new old), optional columns to rename index : string, optional post-rename column to use as the row label. ''' data = pj.read_json(path, orient='records') Cache.rename_columns(data, rename) if index is not None: data.set_index([index], inplace=True) return data
[docs] @staticmethod def cacher(fn, *args, **kwargs): '''make an rma query, save it and return the dataframe. Parameters ---------- fn : function reference makes the actual query using kwargs. path : string where to save the data strategy : string or None, optional 'create' always generates the data, 'file' loads from disk, 'lazy' queries the server if no file exists, None generates the data and bypasses all caching behavior pre : function df|json->df|json, takes one data argument and returns filtered version, None for pass-through post : function df|json->?, takes one data argument and returns Object reader : function, optional path -> data, default NOP writer : function, optional path, data -> None, default NOP kwargs : objects passed through to the query function Returns ------- Object or None data type depends on fn, reader and/or post methods. ''' path = kwargs.pop('path', None) strategy = kwargs.pop('strategy', None) pre = kwargs.pop('pre', lambda d: d) post = kwargs.pop('post', None) reader = kwargs.pop('reader', None) writer = kwargs.pop('writer', None) if strategy is None: if writer or path: strategy = 'lazy' else: strategy = 'pass_through' if strategy not in ['lazy', 'pass_through', 'file', 'create']: raise ValueError("Unknown query strategy: {}.".format(strategy)) if 'lazy' == strategy: if os.path.exists(path): strategy = 'file' else: strategy = 'create' if strategy == 'pass_through': data = fn(*args, **kwargs) elif strategy in ['create']: Manifest.safe_make_parent_dirs(path) if writer: data = fn(*args, **kwargs) data = pre(data) writer(path, data) else: data = fn(*args, **kwargs) if reader: data = reader(path) # Note: don't provide post if fn or reader doesn't return data if post: data = post(data) return data try: data return data except Exception: pass return
[docs] @staticmethod def csv_writer(pth, gen): csv_writer = None first_row = True row_count = 1 with open(pth, 'w') as output: for row in gen: if first_row: field_names = [str(k) for k in row.keys()] csv_writer = csv.DictWriter(output, fieldnames=field_names, delimiter=',', quoting=csv.QUOTE_ALL) csv_writer.writeheader() first_row = False Cache._log.info('row: {}'.format(row_count)) row_count = row_count + 1 csv_writer.writerow(row)
[docs] @staticmethod def cache_csv_json(): def reader(f): return pd.read_csv(f, parse_dates=True).to_dict('records') return { 'writer': Cache.csv_writer, 'reader': reader }
[docs] @staticmethod def cache_csv_dataframe(): return { 'writer': Cache.csv_writer, 'reader': lambda f: pd.read_csv(f, parse_dates=True) }
[docs] @staticmethod def nocache_dataframe(): return { 'post': pd.DataFrame }
[docs] @staticmethod def nocache_json(): return { }
[docs] @staticmethod def cache_json_dataframe(): return { 'writer': ju.write, 'reader': lambda p: pj.read_json(p, orient='records') }
[docs] @staticmethod def cache_json(): return { 'writer': ju.write, 'reader': ju.read }
[docs] @staticmethod def cache_csv(): return { 'writer': Cache.csv_writer, 'reader': lambda f: pd.read_csv(f, parse_dates=True) }
[docs] @staticmethod def pathfinder(file_name_position, secondary_file_name_position=None, path_keyword=None): '''helper method to find path argument in legacy methods written prior to the @cacheable decorator. Do not use for new @cacheable methods. Parameters ---------- file_name_position : integer zero indexed position in the decorated method args where file path may be found. secondary_file_name_position : integer zero indexed position in the decorated method args where the file path may be found. path_keyword : string kwarg that may have the file path. Notes ----- This method is only intended to provide backward-compatibility for some methods that otherwise do not follow the path conventions of the @cacheable decorator. ''' def pf(*args, **kwargs): file_name = None if path_keyword is not None and path_keyword in kwargs: file_name = kwargs[path_keyword] else: if file_name_position < len(args): file_name = args[file_name_position] if (file_name is None and secondary_file_name_position and secondary_file_name_position < len(args)): # noqa E129 file_name = args[secondary_file_name_position] return file_name return pf
[docs] @deprecated() def wrap(self, fn, path, cache, save_as_json=True, return_dataframe=False, index=None, rename=None, **kwargs): '''make an rma query, save it and return the dataframe. Parameters ---------- fn : function reference makes the actual query using kwargs. path : string where to save the data cache : boolean True will make the query, False just loads from disk save_as_json : boolean, optional True (default) will save data as json, False as csv return_dataframe : boolean, optional True will cast the return value to a pandas dataframe, False (default) will not index : string, optional column to use as the pandas index rename : list of string tuples, optional (new, old) columns to rename kwargs : objects passed through to the query function Returns ------- dict or DataFrame data type depends on return_dataframe option. Notes ----- Column renaming happens after the file is reloaded for json ''' if cache is True: json_data = fn(**kwargs) if save_as_json is True: ju.write(path, json_data) else: df = pd.DataFrame(json_data) Cache.rename_columns(df, rename) if index is not None: df.set_index([index], inplace=True) df.to_csv(path) # read it back in if save_as_json is True: if return_dataframe is True: data = pj.read_json(path, orient='records') Cache.rename_columns(data, rename) if index is not None: data.set_index([index], inplace=True) else: data = ju.read(path) elif return_dataframe is True: data = pd.read_csv(path, parse_dates=True) else: raise ValueError( 'save_as_json=False cannot be used with ' 'return_dataframe=False') return data
[docs]def cacheable(strategy=None, pre=None, writer=None, reader=None, post=None, pathfinder=None): '''decorator for rma queries, save it and return the dataframe. Parameters ---------- fn : function reference makes the actual query using kwargs. path : string where to save the data strategy : string or None, optional 'create' always gets the data from the source (server or generated), 'file' loads from disk, 'lazy' creates the data and saves to file if no file exists, None queries the server and bypasses all caching behavior pre : function df|json->df|json, takes one data argument and returns filtered version, None for pass-through post : function df|json->?, takes one data argument and returns Object reader : function, optional path -> data, default NOP writer : function, optional path, data -> None, default NOP kwargs : objects passed through to the query function Returns ------- dict or DataFrame data type depends on dataframe option. Notes ----- Column renaming happens after the file is reloaded for json ''' def decor(func): decor.strategy = strategy decor.pre = pre decor.writer = writer decor.reader = reader decor.post = post decor.pathfinder = pathfinder @functools.wraps(func) def w(*args, **kwargs): if decor.pathfinder and 'pathfinder' not in kwargs: pathfinder = decor.pathfinder else: pathfinder = kwargs.pop('pathfinder', None) if pathfinder and 'path' not in kwargs: found_path = pathfinder(*args, **kwargs) if found_path: kwargs['path'] = found_path if decor.strategy and 'strategy' not in kwargs: kwargs['strategy'] = decor.strategy if decor.pre and 'pre' not in kwargs: kwargs['pre'] = decor.pre if decor.writer and 'writer' not in kwargs: kwargs['writer'] = decor.writer if decor.reader and 'reader' not in kwargs: kwargs['reader'] = decor.reader if decor.post and not 'post in kwargs': kwargs['post'] = decor.post result = Cache.cacher(func, *args, **kwargs) return result return w return decor
[docs]def get_default_manifest_file(cache_name): return os.environ.get( '{}_MANIFEST'.format(cache_name.upper()), '{}/manifest.json'.format(cache_name.lower()) )