Disk Cache -- Disk and file backed persistent cache.
—
DiskCache provides decorator functions for advanced caching patterns including throttling, serialization barriers, and memoization with cache stampede protection. These functions return decorators that can be applied to other functions to add caching behavior.
Rate limiting decorator that restricts function calls to a specified frequency.
def throttle(cache, count, seconds, name=None, expire=None, tag=None,
time_func=time.time, sleep_func=time.sleep):
"""
Create throttling decorator that limits function calls to specified rate.
Args:
cache (Cache or FanoutCache): Cache instance for rate tracking
count (int): Maximum number of calls allowed
seconds (float): Time window in seconds for the call limit
name (str, optional): Name for throttle key. Default uses function name.
expire (float, optional): Expiration time for throttle data
tag (str, optional): Tag for grouping related throttle data
time_func (callable): Function to get current time. Default time.time.
sleep_func (callable): Function for sleeping/waiting. Default time.sleep.
Returns:
Decorator function that enforces the specified call rate
Usage:
@throttle(cache, count=5, seconds=60)
def api_call():
# This function can only be called 5 times per minute
pass
"""Serialization decorator that ensures only one instance of the decorated function runs at a time using a provided lock factory.
def barrier(cache, lock_factory, name=None, expire=None, tag=None):
"""
Create barrier decorator that serializes access to function using locks.
Args:
cache (Cache or FanoutCache): Cache instance for lock coordination
lock_factory (callable): Function that creates lock instances
name (str, optional): Name for barrier key. Default uses function name.
expire (float, optional): Expiration time for lock
tag (str, optional): Tag for grouping related locks
Returns:
Decorator function that serializes function execution
Usage:
@barrier(cache, diskcache.Lock)
def critical_function():
# Only one instance of this function runs at a time
pass
"""Memoization decorator with cache stampede protection using early expiration and probabilistic refresh.
def memoize_stampede(cache, expire, name=None, typed=False, tag=None,
beta=1, ignore=()):
"""
Create memoization decorator with cache stampede protection.
Uses probabilistic early expiration to prevent cache stampede - the
"thundering herd" problem where many processes simultaneously try to
regenerate an expired cached value.
Args:
cache (Cache or FanoutCache): Cache instance for memoization
expire (float): Base expiration time in seconds
name (str, optional): Name for memoized function. Default function name.
typed (bool): Distinguish arguments by type. Default False.
tag (str, optional): Tag for grouping cached results
beta (float): Early expiration factor. Default 1. Higher values
increase probability of early expiration.
ignore (tuple): Argument positions/names to ignore in cache key
Returns:
Memoization decorator with stampede protection
Usage:
@memoize_stampede(cache, expire=3600, beta=1.5)
def expensive_computation(x, y):
# Cached with stampede protection
return x ** y
"""import diskcache
import time
import requests
cache = diskcache.Cache('/tmp/throttle')
# Limit API calls to 10 per minute
@diskcache.throttle(cache, count=10, seconds=60)
def call_api(endpoint):
"""API calls are automatically throttled to 10 per minute."""
response = requests.get(f"https://api.example.com/{endpoint}")
return response.json()
# These calls will be throttled
for i in range(20):
try:
result = call_api(f"endpoint_{i}")
print(f"Call {i}: Success")
except Exception as e:
print(f"Call {i}: {e}")
time.sleep(1)import diskcache
import time
cache = diskcache.Cache('/tmp/custom_throttle')
# Custom throttle with different time and sleep functions
@diskcache.throttle(
cache,
count=3,
seconds=10,
name='custom_function',
expire=3600,
tag='rate_limited',
time_func=time.time,
sleep_func=lambda x: time.sleep(x * 0.5) # Sleep for half the required time
)
def custom_throttled_function():
print(f"Function called at {time.time()}")
return "result"
# Test throttling behavior
for i in range(6):
print(f"Attempt {i + 1}")
result = custom_throttled_function()
print(f"Result: {result}")import diskcache
import threading
import time
cache = diskcache.Cache('/tmp/barrier')
# Use Lock as the lock factory for barriers
@diskcache.barrier(cache, diskcache.Lock, expire=60)
def critical_file_operation(filename):
"""Only one thread can perform file operations at a time."""
print(f"Starting file operation on {filename}")
time.sleep(2) # Simulate file I/O
with open(f"/tmp/{filename}", 'w') as f:
f.write(f"Data written at {time.time()}")
print(f"Completed file operation on {filename}")
return f"Processed {filename}"
# Multiple threads trying to access the critical section
def worker(worker_id):
result = critical_file_operation(f"file_{worker_id}.txt")
print(f"Worker {worker_id}: {result}")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()import diskcache
import threading
cache = diskcache.Cache('/tmp/rlock_barrier')
# Use RLock for re-entrant barriers
@diskcache.barrier(cache, diskcache.RLock, name='reentrant_critical')
def recursive_critical_function(depth):
"""Re-entrant critical function using RLock barrier."""
if depth <= 0:
return "Done"
print(f"In critical section at depth {depth}")
time.sleep(0.5)
# This will re-acquire the same lock (re-entrant)
result = recursive_critical_function(depth - 1)
return f"Depth {depth}: {result}"
result = recursive_critical_function(3)
print(result)import diskcache
import time
import random
import threading
cache = diskcache.Cache('/tmp/memoize_stampede')
@diskcache.memoize_stampede(
cache,
expire=10, # Base expiration of 10 seconds
beta=1.5, # 50% higher chance of early refresh
tag='expensive_computation'
)
def expensive_computation(n):
"""Expensive computation with stampede protection."""
print(f"Computing expensive_computation({n}) - this should happen rarely")
time.sleep(2) # Simulate expensive computation
return n ** 2 + random.randint(1, 100)
def worker(worker_id, n):
result = expensive_computation(n)
print(f"Worker {worker_id}: expensive_computation({n}) = {result}")
# Simulate many workers requesting the same computation
# The stampede protection should prevent multiple simultaneous computations
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i, 42)) # All workers use same input
threads.append(t)
t.start()
for t in threads:
t.join()
print("\nWaiting for potential early expiration...")
time.sleep(8) # Wait close to expiration time
# These calls might trigger early refresh due to beta factor
for i in range(3):
result = expensive_computation(42)
print(f"Late call {i}: {result}")import diskcache
cache = diskcache.Cache('/tmp/advanced_memoize')
@diskcache.memoize_stampede(
cache,
expire=300, # 5 minutes
typed=True, # Distinguish between int(1) and float(1.0)
ignore=(2,), # Ignore third argument in cache key
tag='advanced_function',
beta=2.0 # Higher early expiration probability
)
def advanced_function(x, y, debug_info, *args, **kwargs):
"""
Function with advanced memoization options.
- typed=True: f(1, 2.0) and f(1.0, 2.0) are cached separately
- ignore=(2,): debug_info parameter doesn't affect caching
- Supports *args and **kwargs
"""
print(f"Computing advanced_function({x}, {y}, ignored={debug_info})")
time.sleep(1)
return x * y + sum(args) + sum(kwargs.values())
# These calls will be cached based on x, y, args, and kwargs only
# debug_info is ignored due to ignore=(2,)
result1 = advanced_function(2, 3, "debug1", 10, extra=5)
result2 = advanced_function(2, 3, "debug2", 10, extra=5) # Cache hit (debug_info ignored)
result3 = advanced_function(2.0, 3.0, "debug3", 10, extra=5) # Different due to typed=True
print(f"Result 1: {result1}")
print(f"Result 2: {result2}") # Should be same as result1
print(f"Result 3: {result3}") # Should be same value but was computed separatelyimport diskcache
import time
cache = diskcache.Cache('/tmp/combined')
# Combine throttling and memoization
@diskcache.throttle(cache, count=5, seconds=60, name='throttled_api')
@diskcache.memoize_stampede(cache, expire=300, name='memoized_api', beta=1.0)
def api_with_caching_and_throttling(query):
"""
API function with both throttling and memoization.
- Throttled to 5 calls per minute
- Results cached for 5 minutes with stampede protection
"""
print(f"Making actual API call for query: {query}")
time.sleep(1) # Simulate API delay
return f"API result for {query}"
# First calls - will be computed and cached
print("First batch of calls:")
for i in range(3):
result = api_with_caching_and_throttling(f"query_{i}")
print(f"Call {i}: {result}")
print("\nSecond batch - should hit cache:")
for i in range(3):
result = api_with_caching_and_throttling(f"query_{i}")
print(f"Cached call {i}: {result}")
print("\nMany new calls - will be throttled:")
for i in range(10):
try:
result = api_with_caching_and_throttling(f"new_query_{i}")
print(f"New call {i}: {result}")
except Exception as e:
print(f"New call {i}: Throttled")import diskcache
cache = diskcache.Cache('/tmp/custom_lock')
# Custom lock factory with specific settings
def custom_lock_factory(cache, key, expire=None, tag=None):
return diskcache.RLock(cache, key, expire=expire or 120, tag=tag or 'custom')
@diskcache.barrier(cache, custom_lock_factory, expire=180, tag='critical_ops')
def critical_operation_with_custom_lock():
"""Uses custom lock factory with 2-minute default expiration."""
print("Performing critical operation with custom lock")
time.sleep(1)
return "Operation completed"
result = critical_operation_with_custom_lock()
print(result)# Set reasonable limits and handle throttling gracefully
@diskcache.throttle(cache, count=100, seconds=3600, expire=7200) # 100/hour, data expires in 2 hours
def rate_limited_operation():
pass
# Use different names for different rate limits
@diskcache.throttle(cache, count=10, seconds=60, name='api_writes')
def write_api():
pass
@diskcache.throttle(cache, count=100, seconds=60, name='api_reads')
def read_api():
pass# Use appropriate expiration times
@diskcache.memoize_stampede(cache, expire=3600, beta=1.2) # 1 hour with 20% early refresh
def hourly_report():
pass
@diskcache.memoize_stampede(cache, expire=86400, beta=1.5) # 1 day with 50% early refresh
def daily_summary():
pass
# Ignore volatile arguments
@diskcache.memoize_stampede(cache, expire=300, ignore=('timestamp', 'request_id'))
def process_request(data, timestamp=None, request_id=None):
# timestamp and request_id don't affect the computation
passimport diskcache
import logging
cache = diskcache.Cache('/tmp/error_handling')
@diskcache.throttle(cache, count=5, seconds=60)
def fragile_operation():
try:
# Operation that might fail
risky_computation()
return "success"
except Exception as e:
logging.error(f"Operation failed: {e}")
# Throttling still applies even if function raises exception
raise
# Graceful degradation when cache is not available
try:
@diskcache.memoize_stampede(cache, expire=300)
def cached_operation(x):
return expensive_computation(x)
except Exception as e:
logging.warning(f"Cache not available: {e}")
# Fallback to uncached version
def cached_operation(x):
return expensive_computation(x)Install with Tessl CLI
npx tessl i tessl/pypi-diskcache