CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-diskcache

Disk Cache -- Disk and file backed persistent cache.

Pending
Overview
Eval results
Files

recipe-functions.mddocs/

Recipe Functions

DiskCache provides decorator functions for advanced caching patterns including throttling, serialization barriers, and memoization with cache stampede protection. These functions return decorators that can be applied to other functions to add caching behavior.

Capabilities

Throttle Decorator

Rate limiting decorator that restricts function calls to a specified frequency.

def throttle(cache, count, seconds, name=None, expire=None, tag=None, 
             time_func=time.time, sleep_func=time.sleep):
    """
    Create throttling decorator that limits function calls to specified rate.
    
    Args:
        cache (Cache or FanoutCache): Cache instance for rate tracking
        count (int): Maximum number of calls allowed
        seconds (float): Time window in seconds for the call limit
        name (str, optional): Name for throttle key. Default uses function name.
        expire (float, optional): Expiration time for throttle data
        tag (str, optional): Tag for grouping related throttle data
        time_func (callable): Function to get current time. Default time.time.
        sleep_func (callable): Function for sleeping/waiting. Default time.sleep.
        
    Returns:
        Decorator function that enforces the specified call rate
        
    Usage:
        @throttle(cache, count=5, seconds=60)
        def api_call():
            # This function can only be called 5 times per minute
            pass
    """

Barrier Decorator

Serialization decorator that ensures only one instance of the decorated function runs at a time using a provided lock factory.

def barrier(cache, lock_factory, name=None, expire=None, tag=None):
    """
    Create barrier decorator that serializes access to function using locks.
    
    Args:
        cache (Cache or FanoutCache): Cache instance for lock coordination
        lock_factory (callable): Function that creates lock instances
        name (str, optional): Name for barrier key. Default uses function name.
        expire (float, optional): Expiration time for lock
        tag (str, optional): Tag for grouping related locks
        
    Returns:
        Decorator function that serializes function execution
        
    Usage:
        @barrier(cache, diskcache.Lock)
        def critical_function():
            # Only one instance of this function runs at a time
            pass
    """

Memoize Stampede Decorator

Memoization decorator with cache stampede protection using early expiration and probabilistic refresh.

def memoize_stampede(cache, expire, name=None, typed=False, tag=None, 
                     beta=1, ignore=()):
    """
    Create memoization decorator with cache stampede protection.
    
    Uses probabilistic early expiration to prevent cache stampede - the
    "thundering herd" problem where many processes simultaneously try to
    regenerate an expired cached value.
    
    Args:
        cache (Cache or FanoutCache): Cache instance for memoization
        expire (float): Base expiration time in seconds
        name (str, optional): Name for memoized function. Default function name.
        typed (bool): Distinguish arguments by type. Default False.
        tag (str, optional): Tag for grouping cached results
        beta (float): Early expiration factor. Default 1. Higher values
                     increase probability of early expiration.
        ignore (tuple): Argument positions/names to ignore in cache key
        
    Returns:
        Memoization decorator with stampede protection
        
    Usage:
        @memoize_stampede(cache, expire=3600, beta=1.5)
        def expensive_computation(x, y):
            # Cached with stampede protection
            return x ** y
    """

Usage Examples

Throttling API Calls

import diskcache
import time
import requests

cache = diskcache.Cache('/tmp/throttle')

# Limit API calls to 10 per minute
@diskcache.throttle(cache, count=10, seconds=60)
def call_api(endpoint):
    """API calls are automatically throttled to 10 per minute."""
    response = requests.get(f"https://api.example.com/{endpoint}")
    return response.json()

# These calls will be throttled
for i in range(20):
    try:
        result = call_api(f"endpoint_{i}")
        print(f"Call {i}: Success")
    except Exception as e:
        print(f"Call {i}: {e}")
    time.sleep(1)

Custom Throttling Parameters

import diskcache
import time

cache = diskcache.Cache('/tmp/custom_throttle')

# Custom throttle with different time and sleep functions
@diskcache.throttle(
    cache, 
    count=3, 
    seconds=10,
    name='custom_function',
    expire=3600,
    tag='rate_limited',
    time_func=time.time,
    sleep_func=lambda x: time.sleep(x * 0.5)  # Sleep for half the required time
)
def custom_throttled_function():
    print(f"Function called at {time.time()}")
    return "result"

# Test throttling behavior
for i in range(6):
    print(f"Attempt {i + 1}")
    result = custom_throttled_function()
    print(f"Result: {result}")

Barrier for Critical Sections

import diskcache
import threading
import time

cache = diskcache.Cache('/tmp/barrier')

# Use Lock as the lock factory for barriers
@diskcache.barrier(cache, diskcache.Lock, expire=60)
def critical_file_operation(filename):
    """Only one thread can perform file operations at a time."""
    print(f"Starting file operation on {filename}")
    time.sleep(2)  # Simulate file I/O
    with open(f"/tmp/{filename}", 'w') as f:
        f.write(f"Data written at {time.time()}")
    print(f"Completed file operation on {filename}")
    return f"Processed {filename}"

# Multiple threads trying to access the critical section
def worker(worker_id):
    result = critical_file_operation(f"file_{worker_id}.txt")
    print(f"Worker {worker_id}: {result}")

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Custom Barrier with RLock

import diskcache
import threading

cache = diskcache.Cache('/tmp/rlock_barrier')

# Use RLock for re-entrant barriers
@diskcache.barrier(cache, diskcache.RLock, name='reentrant_critical')
def recursive_critical_function(depth):
    """Re-entrant critical function using RLock barrier."""
    if depth <= 0:
        return "Done"
    
    print(f"In critical section at depth {depth}")
    time.sleep(0.5)
    
    # This will re-acquire the same lock (re-entrant)
    result = recursive_critical_function(depth - 1)
    return f"Depth {depth}: {result}"

result = recursive_critical_function(3)
print(result)

Memoization with Stampede Protection

import diskcache
import time
import random
import threading

cache = diskcache.Cache('/tmp/memoize_stampede')

@diskcache.memoize_stampede(
    cache, 
    expire=10,  # Base expiration of 10 seconds
    beta=1.5,   # 50% higher chance of early refresh
    tag='expensive_computation'
)
def expensive_computation(n):
    """Expensive computation with stampede protection."""
    print(f"Computing expensive_computation({n}) - this should happen rarely")
    time.sleep(2)  # Simulate expensive computation
    return n ** 2 + random.randint(1, 100)

def worker(worker_id, n):
    result = expensive_computation(n)
    print(f"Worker {worker_id}: expensive_computation({n}) = {result}")

# Simulate many workers requesting the same computation
# The stampede protection should prevent multiple simultaneous computations
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i, 42))  # All workers use same input
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("\nWaiting for potential early expiration...")
time.sleep(8)  # Wait close to expiration time

# These calls might trigger early refresh due to beta factor
for i in range(3):
    result = expensive_computation(42)
    print(f"Late call {i}: {result}")

Advanced Memoization Options

import diskcache

cache = diskcache.Cache('/tmp/advanced_memoize')

@diskcache.memoize_stampede(
    cache,
    expire=300,  # 5 minutes
    typed=True,  # Distinguish between int(1) and float(1.0)
    ignore=(2,),  # Ignore third argument in cache key
    tag='advanced_function',
    beta=2.0  # Higher early expiration probability
)
def advanced_function(x, y, debug_info, *args, **kwargs):
    """
    Function with advanced memoization options.
    
    - typed=True: f(1, 2.0) and f(1.0, 2.0) are cached separately
    - ignore=(2,): debug_info parameter doesn't affect caching
    - Supports *args and **kwargs
    """
    print(f"Computing advanced_function({x}, {y}, ignored={debug_info})")
    time.sleep(1)
    return x * y + sum(args) + sum(kwargs.values())

# These calls will be cached based on x, y, args, and kwargs only
# debug_info is ignored due to ignore=(2,)
result1 = advanced_function(2, 3, "debug1", 10, extra=5)
result2 = advanced_function(2, 3, "debug2", 10, extra=5)  # Cache hit (debug_info ignored)
result3 = advanced_function(2.0, 3.0, "debug3", 10, extra=5)  # Different due to typed=True

print(f"Result 1: {result1}")
print(f"Result 2: {result2}")  # Should be same as result1
print(f"Result 3: {result3}")  # Should be same value but was computed separately

Combining Recipe Functions

import diskcache
import time

cache = diskcache.Cache('/tmp/combined')

# Combine throttling and memoization
@diskcache.throttle(cache, count=5, seconds=60, name='throttled_api')
@diskcache.memoize_stampede(cache, expire=300, name='memoized_api', beta=1.0)
def api_with_caching_and_throttling(query):
    """
    API function with both throttling and memoization.
    - Throttled to 5 calls per minute
    - Results cached for 5 minutes with stampede protection
    """
    print(f"Making actual API call for query: {query}")
    time.sleep(1)  # Simulate API delay
    return f"API result for {query}"

# First calls - will be computed and cached
print("First batch of calls:")
for i in range(3):
    result = api_with_caching_and_throttling(f"query_{i}")
    print(f"Call {i}: {result}")

print("\nSecond batch - should hit cache:")
for i in range(3):
    result = api_with_caching_and_throttling(f"query_{i}")
    print(f"Cached call {i}: {result}")

print("\nMany new calls - will be throttled:")
for i in range(10):
    try:
        result = api_with_caching_and_throttling(f"new_query_{i}")
        print(f"New call {i}: {result}")
    except Exception as e:
        print(f"New call {i}: Throttled")

Custom Lock Factory

import diskcache

cache = diskcache.Cache('/tmp/custom_lock')

# Custom lock factory with specific settings
def custom_lock_factory(cache, key, expire=None, tag=None):
    return diskcache.RLock(cache, key, expire=expire or 120, tag=tag or 'custom')

@diskcache.barrier(cache, custom_lock_factory, expire=180, tag='critical_ops')
def critical_operation_with_custom_lock():
    """Uses custom lock factory with 2-minute default expiration."""
    print("Performing critical operation with custom lock")
    time.sleep(1)
    return "Operation completed"

result = critical_operation_with_custom_lock()
print(result)

Best Practices

Throttling Best Practices

# Set reasonable limits and handle throttling gracefully
@diskcache.throttle(cache, count=100, seconds=3600, expire=7200)  # 100/hour, data expires in 2 hours
def rate_limited_operation():
    pass

# Use different names for different rate limits
@diskcache.throttle(cache, count=10, seconds=60, name='api_writes')
def write_api():
    pass

@diskcache.throttle(cache, count=100, seconds=60, name='api_reads')  
def read_api():
    pass

Memoization Best Practices

# Use appropriate expiration times
@diskcache.memoize_stampede(cache, expire=3600, beta=1.2)  # 1 hour with 20% early refresh
def hourly_report():
    pass

@diskcache.memoize_stampede(cache, expire=86400, beta=1.5)  # 1 day with 50% early refresh  
def daily_summary():
    pass

# Ignore volatile arguments
@diskcache.memoize_stampede(cache, expire=300, ignore=('timestamp', 'request_id'))
def process_request(data, timestamp=None, request_id=None):
    # timestamp and request_id don't affect the computation
    pass

Error Handling

import diskcache
import logging

cache = diskcache.Cache('/tmp/error_handling')

@diskcache.throttle(cache, count=5, seconds=60)
def fragile_operation():
    try:
        # Operation that might fail
        risky_computation()
        return "success"
    except Exception as e:
        logging.error(f"Operation failed: {e}")
        # Throttling still applies even if function raises exception
        raise

# Graceful degradation when cache is not available
try:
    @diskcache.memoize_stampede(cache, expire=300)
    def cached_operation(x):
        return expensive_computation(x)
        
except Exception as e:
    logging.warning(f"Cache not available: {e}")
    # Fallback to uncached version
    def cached_operation(x):
        return expensive_computation(x)

Install with Tessl CLI

npx tessl i tessl/pypi-diskcache

docs

core-caching.md

disk-serialization.md

django-integration.md

fanout-cache.md

index.md

persistent-data-structures.md

recipe-functions.md

synchronization-primitives.md

tile.json