Persistent, stale-free, local and cross-machine caching for Python functions.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cachier provides comprehensive cache management capabilities through methods attached to decorated functions. These methods allow you to control cache behavior, clear cached data, and pre-populate caches with known values.
Remove all cached entries for a specific function:
def clear_cache() -> None:
"""
Clear all cached entries for this function.
Removes all cached results from the function's cache storage.
The next call to the function will trigger a fresh calculation
regardless of any previously cached values.
"""Reset calculation state markers for concurrent access:
def clear_being_calculated() -> None:
"""
Mark all entries in this cache as not being calculated.
Useful for resetting state after process crashes or when
calculation locks become stale. This allows other processes
to proceed with calculations that were previously marked
as being processed.
"""Get the file system location where cache data is stored:
def cache_dpath() -> Optional[str]:
"""
Return the path to the cache directory, if exists; None if not.
For file-based backends (pickle), returns the directory path
where cache files are stored. For other backends (memory,
mongo, redis, sql), returns None as they don't use filesystem
storage.
Returns:
str: Path to cache directory for file-based backends
None: For non-file backends or if directory doesn't exist
"""Add known values to the cache without function execution:
def precache_value(*args, value_to_cache, **kwargs):
"""
Add an initial value to the cache.
Allows manual population of cache with known results, useful
for bootstrapping caches or providing fallback values.
Parameters:
- *args: Positional arguments that would be passed to function
- **kwargs: Keyword arguments that would be passed to function
- value_to_cache: The result value to store in cache
Returns:
The cached value (same as value_to_cache parameter)
"""from cachier import cachier
from datetime import timedelta
@cachier(stale_after=timedelta(hours=1))
def expensive_calculation(n, precision=2):
"""Simulate expensive computation."""
result = sum(i**precision for i in range(n))
return result
# Use the function normally
result1 = expensive_calculation(1000) # Computed and cached
result2 = expensive_calculation(1000) # Retrieved from cache
# Clear the cache when needed
expensive_calculation.clear_cache()
result3 = expensive_calculation(1000) # Computed again
# Check cache storage location
cache_path = expensive_calculation.cache_dpath()
if cache_path:
print(f"Cache files stored in: {cache_path}")
# Pre-populate cache with known values
expensive_calculation.precache_value(100, precision=2, value_to_cache=338350)
result4 = expensive_calculation(100, precision=2) # Uses precached valueimport threading
from cachier import cachier
@cachier(backend='pickle', wait_for_calc_timeout=30)
def shared_computation(data_id):
"""Function that might be called concurrently."""
import time
time.sleep(5) # Simulate long computation
return f"processed_{data_id}"
def worker_thread(thread_id):
try:
result = shared_computation("shared_data")
print(f"Thread {thread_id}: {result}")
except Exception as e:
print(f"Thread {thread_id} failed: {e}")
# Start multiple threads
threads = []
for i in range(5):
t = threading.Thread(target=worker_thread, args=(i,))
threads.append(t)
t.start()
# If there's a process crash or hanging calculation
# Reset the calculation state
shared_computation.clear_being_calculated()
# Wait for all threads
for t in threads:
t.join()from cachier import cachier
import json
@cachier(backend='pickle')
def api_lookup(user_id, include_details=False):
"""Look up user data from API."""
# Simulate API call
import requests
url = f"https://api.example.com/users/{user_id}"
if include_details:
url += "?details=true"
return requests.get(url).json()
# Pre-populate cache with known test data
test_users = {
123: {"name": "Alice", "email": "alice@example.com"},
456: {"name": "Bob", "email": "bob@example.com"}
}
for user_id, user_data in test_users.items():
api_lookup.precache_value(user_id, value_to_cache=user_data)
api_lookup.precache_value(
user_id,
include_details=True,
value_to_cache={**user_data, "details": "full_profile"}
)
# Now these calls use precached data
alice = api_lookup(123) # Uses precached data
bob_detailed = api_lookup(456, include_details=True) # Uses precached datafrom cachier import cachier
import os
@cachier(backend='pickle', separate_files=True)
def process_file(file_path, options=None):
"""Process a file with caching."""
with open(file_path, 'r') as f:
content = f.read()
if options and options.get('uppercase'):
content = content.upper()
return {"content": content, "size": len(content)}
# Process several files
result1 = process_file("/path/to/file1.txt")
result2 = process_file("/path/to/file2.txt", {"uppercase": True})
result3 = process_file("/path/to/file3.txt")
# Clear all cached results
process_file.clear_cache()
# With separate_files=True, you can also manually remove
# specific cache files if needed
cache_dir = process_file.cache_dpath()
if cache_dir and os.path.exists(cache_dir):
cache_files = os.listdir(cache_dir)
print(f"Cache files: {cache_files}")from cachier import cachier
import logging
logger = logging.getLogger(__name__)
@cachier(backend='mongo', wait_for_calc_timeout=60)
def distributed_task(task_id):
"""Task that runs on multiple machines."""
# Long-running distributed computation
return perform_distributed_analysis(task_id)
def recover_from_crash():
"""Recovery procedure after system crash."""
try:
# Clear any stale calculation locks
distributed_task.clear_being_calculated()
logger.info("Cleared stale calculation locks")
# Optionally clear cache if data might be corrupted
# distributed_task.clear_cache()
# logger.info("Cleared potentially corrupted cache")
except Exception as e:
logger.error(f"Recovery failed: {e}")
# Call during application startup after crash
recover_from_crash()from cachier import cachier
import os
import json
from datetime import datetime
@cachier(
backend='pickle',
separate_files=True,
cleanup_stale=True,
cleanup_interval=timedelta(hours=1)
)
def monitored_function(param):
"""Function with cache monitoring."""
return complex_computation(param)
def inspect_cache():
"""Inspect cache state and perform maintenance."""
cache_dir = monitored_function.cache_dpath()
if not cache_dir or not os.path.exists(cache_dir):
print("No cache directory found")
return
cache_files = os.listdir(cache_dir)
print(f"Found {len(cache_files)} cache files")
total_size = 0
for filename in cache_files:
file_path = os.path.join(cache_dir, filename)
if os.path.isfile(file_path):
size = os.path.getsize(file_path)
total_size += size
mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
print(f" {filename}: {size} bytes, modified {mtime}")
print(f"Total cache size: {total_size} bytes")
# Clear cache if it's too large
if total_size > 1024 * 1024 * 100: # 100MB
print("Cache size exceeds limit, clearing...")
monitored_function.clear_cache()
# Run periodic maintenance
inspect_cache()These cache management methods are automatically attached to all functions decorated with @cachier():
clear_cache(), clear_being_calculated(), precache_value()cache_dpath() returns a pathcache_dpath() for memory, mongo, redis, sql backendsThe methods provide a consistent interface regardless of the backend used, allowing you to write cache management code that works across different storage systems.
Install with Tessl CLI
npx tessl i tessl/pypi-cachier