import functools
import hashlib
import os
import pickle
import types
import warnings
from collections.abc import Callable
from typing import Any
import dill
import numpy as np
import pandas as pd
from diskcache import Cache
from xxhash import xxh32
from .common import logger, settings
def _seed() -> int:
"""Return the active xxhash seed (process-singleton override-friendly)."""
return settings.seed
def _cache_dir() -> str:
"""Return the active cache directory (process-singleton override-friendly)."""
return settings.cache_dir
@functools.lru_cache(maxsize=8)
def _shared_cache(directory: str) -> Cache:
"""Return a shared :class:`diskcache.Cache` keyed by directory.
Previously :func:`cacheable` opened ``Cache(directory=cachedir)`` on every
call, which performed a synchronous ``mkdir`` + sqlite open per
invocation. Re-using a single ``Cache`` per directory is both faster and
safer (diskcache itself is process-safe).
"""
return Cache(directory=directory)
#: Module-level switch controlling whether :func:`convert_to_type` will
#: attempt to interpret string arguments as filesystem paths and hash them
#: as ``FileType`` / ``DirType``. The historic default was ``True``, which
#: silently invalidated cache keys when files moved or were renamed and made
#: it impossible to pass arbitrary strings that happened to look like paths.
#: New code should pass an explicit ``arg_types={"x": FileType}`` instead.
#: This module-level toggle exists to make the deprecation gradual rather
#: than abrupt.
PATH_SNIFFING_ENABLED = bool(int(os.environ.get("SCALABLE_PATH_SNIFFING", "1")))
[docs]
class GenericType:
"""The GenericType class is a base class for all types that can be hashed.
Parameters
----------
value : Any
The value to be hashed.
"""
def __init__(self, value: Any) -> None:
self.value = value
[docs]
class FileType(GenericType):
"""The FileType class is used to hash files.
Parameters
----------
value : str
The path to the file.
"""
def __hash__(self) -> int:
if os.path.exists(self.value):
x = xxh32(seed=_seed())
x.update(str(os.path.basename(self.value)).encode('utf-8'))
with open(self.value, 'rb') as file:
# Stream the file in chunks so we don't load multi-GB files
# entirely into memory just to hash them.
for chunk in iter(lambda: file.read(1024 * 1024), b""):
x.update(chunk)
return x.intdigest()
raise ValueError(f"File does not exist: {self.value!r}")
[docs]
class DirType(GenericType):
"""The DirType class is used to hash directories.
Parameters
----------
value : str
The path to the directory.
"""
def __hash__(self) -> int:
if not os.path.exists(self.value):
raise ValueError(f"Directory does not exist: {self.value!r}")
x = xxh32(seed=_seed())
x.update(str(os.path.basename(self.value)).encode('utf-8'))
for filename in sorted(os.listdir(self.value)):
x.update(filename.encode('utf-8'))
path = os.path.join(self.value, filename)
if os.path.isfile(path):
with open(path, 'rb') as file:
for chunk in iter(lambda: file.read(1024 * 1024), b""):
x.update(chunk)
elif os.path.isdir(path):
x.update(hash_to_bytes(hash(DirType(path))))
return x.intdigest()
[docs]
class ValueType(GenericType):
"""Hash for generic primitive values (int, str, float, bytes, bool)."""
def __hash__(self) -> int:
x = xxh32(seed=_seed())
x.update(str(self.value).encode('utf-8'))
return x.intdigest()
[docs]
class ObjectType(GenericType):
"""Hash for composite objects (lists, tuples, dicts, fall-through pickle).
Notes
-----
The original implementation silently swallowed *any* exception when
sorting dict keys. We narrow that to :class:`TypeError` and log a debug
message so unexpected errors surface during development.
"""
def __hash__(self) -> int:
x = xxh32(seed=_seed())
if isinstance(self.value, (list, tuple)):
for element in self.value:
x.update(hash_to_bytes(hash(convert_to_type(element))))
elif isinstance(self.value, dict):
keys = list(self.value.keys())
try:
keys = sorted(keys)
except TypeError:
logger.debug(
"Dict keys not totally orderable; hashing in insertion order."
)
for key in keys:
x.update(hash_to_bytes(hash(convert_to_type(key))))
x.update(hash_to_bytes(hash(convert_to_type(self.value[key]))))
else:
try:
x.update(pickle.dumps(self.value))
except (pickle.PicklingError, TypeError) as exc:
raise TypeError(
f"ObjectType cannot hash {type(self.value).__name__}; "
"wrap it in a custom GenericType subclass with a defined "
"__hash__ or pass arg_types= to @cacheable."
) from exc
return x.intdigest()
[docs]
class UtilityType(GenericType):
"""Hash for numpy arrays and pandas dataframes.
More utility data types can be added by subclassing or registering.
"""
def __hash__(self) -> int:
x = xxh32(seed=_seed())
if isinstance(self.value, np.ndarray):
# Include dtype + shape so two arrays with the same byte stream
# but different shapes hash differently.
x.update(str(self.value.dtype).encode('utf-8'))
x.update(str(self.value.shape).encode('utf-8'))
x.update(self.value.tobytes())
elif isinstance(self.value, pd.DataFrame):
x.update(pickle.dumps(self.value))
else: # pragma: no cover - defensive; predicate in convert_to_type guards us
raise TypeError(f"UtilityType does not support {type(self.value).__name__}")
return x.intdigest()
def hash_to_bytes(hash: int) -> bytes:
"""Converts a hash (or int) to bytes.
Parameters
----------
hash : int
The hash to be converted to bytes.
Returns
-------
bytes
The bytes representation.
"""
return hash.to_bytes((hash.bit_length() + 7) // 8, 'big')
def convert_to_type(arg: Any) -> GenericType:
"""Convert ``arg`` to a hashable :class:`GenericType` subclass.
The mapping is heuristic. For deterministic cache keys, prefer
annotating arguments explicitly via ``@cacheable(arg_types={...})``.
Path sniffing
-------------
Historically, any string that resolved to an existing file or directory
was wrapped as :class:`FileType` / :class:`DirType`, which silently:
* read entire files into the hash, making cache keys depend on file
contents that were never mentioned in the function signature, and
* conflated literal string arguments with paths (e.g. passing
``"/etc"`` would hash the entire ``/etc`` directory).
Path sniffing now emits a :class:`DeprecationWarning` on first use per
process. Disable it by setting ``SCALABLE_PATH_SNIFFING=0`` in the
environment, or — preferred — by passing an explicit ``arg_types=``
mapping to :func:`cacheable`.
"""
if isinstance(arg, str):
if PATH_SNIFFING_ENABLED:
try:
is_file = os.path.isfile(arg)
is_dir = (not is_file) and os.path.isdir(arg)
except (OSError, ValueError):
is_file = is_dir = False
if is_file or is_dir:
_warn_path_sniffing_once(arg)
return FileType(arg) if is_file else DirType(arg)
return ValueType(arg)
if isinstance(arg, (int, float, bool, bytes)):
return ValueType(arg)
if isinstance(arg, (np.ndarray, pd.DataFrame)):
return UtilityType(arg)
if isinstance(arg, (list, dict, tuple)):
return ObjectType(arg)
logger.warning(
"Could not identify type for argument of type %s. Falling back to "
"ObjectType (pickle). For deterministic cache keys, pass arg_types= "
"to @cacheable.",
type(arg).__name__,
)
return ObjectType(arg)
_PATH_SNIFFING_WARNED = False
def _warn_path_sniffing_once(value: str) -> None:
"""Emit a single DeprecationWarning per process for path-sniffing usage."""
global _PATH_SNIFFING_WARNED
if _PATH_SNIFFING_WARNED:
return
_PATH_SNIFFING_WARNED = True
warnings.warn(
"Implicit path-sniffing in convert_to_type() is deprecated. "
f"Argument {value!r} was treated as a file/dir path because it "
"resolved on disk. Pass arg_types={...: FileType/DirType} explicitly "
"to @cacheable, or set SCALABLE_PATH_SNIFFING=0 to disable.",
DeprecationWarning,
stacklevel=3,
)
[docs]
def cacheable(
return_type: type[GenericType] | Callable[..., Any] | None = None,
void: bool = False,
check_output: bool = False,
recompute: bool = False,
store: bool = True,
**arg_types: type[GenericType],
) -> Callable[[Callable[..., Any]], Callable[..., Any]] | Callable[..., Any]:
"""Decorator function to cache the output of a function.
This function is used to cache other functions' outputs for certain
arguments. The function hashes multiple things for a given function
including its name, code content, arguments, and anything else hashed by
the hash() function of the arguments. All arguments are wrapped in a
type class to enable calling hash() on them. Such type classes can be
and often are custom. Since argument types are estimated and not
guaranteed to be correct with more exotic data types, it's best practice
to specify the return value's type class along with the type classes of
all the arguments.
Parameters
----------
return_type : Any
The type class for the return value of the function. Usually
a value between ValueType, FileType, DirType, ObjectType but custom
classes with a defined hash() function can be used as well. Defaults
to None. If None, the return_type will be estimated which is not
guaranteed to be correct.
void : bool, optional
Whether the function returns a value or not. A function is void if it
does not return a value. The default is False.
check_output : bool, optional
Whether to check the output of a function has the same hash as when
it was stored. Useful to ensure entities like files haven't been
modified since initially stored. The default is False.
recompute : bool, optional
Whether to recompute the value or not. The default is False.
store : bool, optional
Whether to store the value in the cache or not. The default is True.
arg_types : dict
The type classes for the arguments of the function. The keys are the
argument names and the values are the type classes. If none are given
for a certain argument, the type class will be estimated which is not
guaranteed to be correct.
Examples
--------
>>> @cacheable
def func(arg1, arg2):
...
>>> @cacheable()
def func(arg1, arg2):
...
>>> @cacheable(void=True)
def func(arg1, arg2):
...
>>> @cacheable(ValueType)
def func(arg1, arg2):
...
>>> @cacheable(return_type=DirType, arg1=UtilityType, arg2=FileType)
def func(arg1, arg2):
...
>>> @cacheable(return_type=ValueType, recompute=False, store=True, arg1=DirType, arg2=FileType)
def func(arg1, arg2):
...
"""
func = None
if isinstance(return_type, types.FunctionType):
func = return_type
return_type = None
def decorator(func):
# Compute the function-identity component of the cache key once at
# decoration time. ``dill.source.getsource`` raises for lambdas,
# functools.partial, REPL definitions, etc. — fall back to a stable
# fingerprint built from the qualified name + bytecode in those cases.
try:
func_source = dill.source.getsource(func)
func_fingerprint = func_source.encode("utf-8")
except (OSError, TypeError) as exc:
logger.debug(
"dill.source.getsource(%s) failed (%s); falling back to "
"qualname+bytecode fingerprint.",
getattr(func, "__qualname__", repr(func)),
exc,
)
qualname = getattr(func, "__qualname__", repr(func))
module = getattr(func, "__module__", "?") or "?"
code = getattr(func, "__code__", None)
bytecode = code.co_code if code is not None else b""
func_fingerprint = (
f"{module}.{qualname}".encode()
+ b"\x00"
+ hashlib.sha256(bytecode).digest()
)
x = xxh32(seed=_seed())
x.update(func_fingerprint)
func_digest = x.intdigest()
@functools.wraps(func)
def inner(*args, **kwargs):
keys = [func_digest]
code = getattr(func, "__code__", None)
if code is not None:
arg_names = code.co_varnames[: code.co_argcount]
else: # pragma: no cover - builtins / C-level callables
arg_names = ()
default_values = {}
if getattr(func, "__defaults__", None):
default_values = dict(
zip(arg_names[-len(func.__defaults__):], func.__defaults__, strict=False)
)
final_args = {}
for index, arg in enumerate(args):
if index < len(arg_names):
final_args[arg_names[index]] = arg
else:
final_args[f"__pos_{index}"] = arg
for keyword, arg in kwargs.items():
final_args[keyword] = arg
for keyword, arg in default_values.items():
final_args.setdefault(keyword, arg)
for keyword, arg in final_args.items():
if keyword in arg_types:
wrapped_arg = arg_types[keyword](arg)
else:
wrapped_arg = convert_to_type(arg)
keys.append(hash(ValueType(keyword)))
keys.append(hash(wrapped_arg))
key = hash(ObjectType(sorted(keys)))
disk = _shared_cache(_cache_dir())
ret = None
if key in disk and not recompute:
value = disk.get(key)
if value is None:
raise KeyError(
f"Key for function {func.__name__} could not be found."
)
stored_digest, stored_value = value[0], value[1]
if check_output:
if return_type is None:
new_digest = hash(convert_to_type(stored_value))
else:
new_digest = hash(return_type(stored_value))
if new_digest == stored_digest:
ret = stored_value
elif not disk.delete(key, retry=True):
logger.warning(
"%s could not be deleted from cache after hash "
"mismatch.",
func.__name__,
)
else:
ret = stored_value
if ret is None:
ret = func(*args, **kwargs)
if store:
if return_type is None:
new_digest = hash(convert_to_type(ret))
else:
new_digest = hash(return_type(ret))
if not disk.add(key=key, value=[new_digest, ret], retry=True):
logger.warning(
"%s could not be added to cache.", func.__name__
)
return ret
return func if void else inner
if func is not None:
return decorator(func)
return decorator