Persistent, stale-free, local and cross-machine caching for Python functions.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cachier supports multiple storage backends to meet different caching requirements, from simple in-memory caching to distributed cross-machine caching solutions. Each backend is optimized for specific use cases and environments.
Backend = Literal["pickle", "mongo", "memory", "redis", "sql"]File-based persistent caching using Python's pickle serialization:
@cachier(backend='pickle')
def cached_function(args):
return computation(args)
# With custom cache directory
@cachier(
backend='pickle',
cache_dir='/path/to/cache',
pickle_reload=True,
separate_files=False
)
def file_cached_function(args):
return computation(args)Pickle Backend Parameters:
cache_dir: Directory for cache files (default: ~/.cachier/)pickle_reload: Reload cache on each read for thread safety (default: True)separate_files: Split cache into separate files per argument set (default: False)Use Cases:
Advantages:
Limitations:
In-memory caching for fastest access within a single process:
@cachier(backend='memory')
def memory_cached_function(args):
return computation(args)
# With calculation timeout
@cachier(
backend='memory',
wait_for_calc_timeout=30
)
def concurrent_memory_function(args):
return computation(args)Memory Backend Parameters:
wait_for_calc_timeout: Max wait time for ongoing calculations (default: 0 - wait forever)Use Cases:
Advantages:
Limitations:
Distributed caching using MongoDB for cross-machine cache sharing:
def get_mongo_collection():
import pymongo
client = pymongo.MongoClient('mongodb://localhost:27017/')
return client.cache_db.function_cache
@cachier(
backend='mongo',
mongetter=get_mongo_collection,
wait_for_calc_timeout=60
)
def distributed_function(args):
return computation(args)MongoDB Backend Parameters:
mongetter: Callable returning pymongo.Collection with write permissionswait_for_calc_timeout: Max wait time for ongoing calculations (default: 0)Use Cases:
Advantages:
Limitations:
High-performance distributed caching using Redis:
def get_redis_client():
import redis
return redis.Redis(host='localhost', port=6379, db=0)
@cachier(
backend='redis',
redis_client=get_redis_client,
wait_for_calc_timeout=30
)
def redis_cached_function(args):
return computation(args)
# Using Redis client instance directly
import redis
redis_client = redis.Redis(host='cache-server', port=6379)
@cachier(
backend='redis',
redis_client=redis_client
)
def fast_distributed_function(args):
return computation(args)Redis Backend Parameters:
redis_client: Redis client instance or callable returning Redis clientwait_for_calc_timeout: Max wait time for ongoing calculations (default: 0)Use Cases:
Advantages:
Limitations:
Database-backed caching using SQLAlchemy for enterprise environments:
# Using connection string
@cachier(
backend='sql',
sql_engine='postgresql://user:pass@localhost/cache_db'
)
def sql_cached_function(args):
return computation(args)
# Using SQLAlchemy Engine
from sqlalchemy import create_engine
engine = create_engine('sqlite:///cache.db')
@cachier(
backend='sql',
sql_engine=engine
)
def database_cached_function(args):
return computation(args)
# Using callable for lazy connection
def get_sql_engine():
return create_engine('mysql://user:pass@dbserver/cache')
@cachier(
backend='sql',
sql_engine=get_sql_engine,
wait_for_calc_timeout=45
)
def enterprise_function(args):
return computation(args)SQL Backend Parameters:
sql_engine: SQLAlchemy connection string, Engine instance, or callable returning Enginewait_for_calc_timeout: Max wait time for ongoing calculations (default: 0)Use Cases:
Advantages:
Limitations:
from cachier import cachier
import time
# Same function with different backends
def expensive_computation(n):
time.sleep(1) # Simulate expensive operation
return sum(i**2 for i in range(n))
# Pickle - for persistent single-machine caching
@cachier(backend='pickle')
def pickle_version(n):
return expensive_computation(n)
# Memory - for fastest access
@cachier(backend='memory')
def memory_version(n):
return expensive_computation(n)
# MongoDB - for distributed caching
@cachier(backend='mongo', mongetter=get_mongo_collection)
def mongo_version(n):
return expensive_computation(n)
# Redis - for high-performance distributed caching
@cachier(backend='redis', redis_client=get_redis_client)
def redis_version(n):
return expensive_computation(n)
# SQL - for enterprise database-backed caching
@cachier(backend='sql', sql_engine='sqlite:///cache.db')
def sql_version(n):
return expensive_computation(n)import os
from cachier import cachier, set_global_params
# Development environment - use memory for speed
if os.getenv('ENV') == 'development':
set_global_params(backend='memory')
# Production environment - use Redis cluster
elif os.getenv('ENV') == 'production':
import redis
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'cache-cluster'),
port=int(os.getenv('REDIS_PORT', 6379)),
password=os.getenv('REDIS_PASSWORD'),
ssl=True
)
set_global_params(
backend='redis',
redis_client=redis_client,
wait_for_calc_timeout=60
)
# Testing environment - use pickle with temp directory
else:
import tempfile
set_global_params(
backend='pickle',
cache_dir=tempfile.mkdtemp()
)
@cachier() # Uses environment-appropriate backend
def application_function(data):
return process_data(data)from cachier import cachier
# Fast local cache for frequently accessed data
@cachier(backend='memory', stale_after=timedelta(minutes=5))
def local_fast_cache(key):
# Check distributed cache first
return distributed_cache(key)
# Distributed cache for shared data across instances
@cachier(backend='redis', redis_client=redis_client, stale_after=timedelta(hours=1))
def distributed_cache(key):
# Fallback to persistent storage
return persistent_cache(key)
# Persistent backup cache for rarely accessed data
@cachier(backend='sql', sql_engine=db_engine, stale_after=timedelta(days=1))
def persistent_cache(key):
# Original data source
return expensive_data_fetch(key)
# This creates a three-tier caching strategy:
# 1. Memory (fastest, 5min expiry)
# 2. Redis (fast distributed, 1hr expiry)
# 3. SQL (persistent backup, 1day expiry)
# 4. Original source (slowest)from typing import Callable, Union, TYPE_CHECKING
if TYPE_CHECKING:
import pymongo.collection
import redis
HashFunc = Callable[..., str]
Mongetter = Callable[[], "pymongo.collection.Collection"]
RedisClient = Union["redis.Redis", Callable[[], "redis.Redis"]]
Backend = Literal["pickle", "mongo", "memory", "redis", "sql"]These type definitions ensure proper type checking and IDE support when configuring different backends.
Install with Tessl CLI
npx tessl i tessl/pypi-cachier