CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-diskcache

Disk Cache -- Disk and file backed persistent cache.

Pending
Overview
Eval results
Files

fanout-cache.mddocs/

Sharded Caching

FanoutCache provides high-throughput caching by automatically distributing keys across multiple Cache instances (shards). This sharding approach improves performance for applications with high concurrency by reducing lock contention and enabling parallel operations across different shards.

Capabilities

FanoutCache Initialization

Create a FanoutCache instance with configurable shard count and settings.

class FanoutCache:
    def __init__(self, directory=None, shards=8, timeout=0.010, disk=Disk, **settings):
        """
        Initialize sharded cache instance.
        
        Args:
            directory (str, optional): Cache directory path. If None, creates temp directory.
            shards (int): Number of shards to distribute writes across. Default 8.
            timeout (float): SQLite connection timeout in seconds. Default 0.010.
            disk (Disk): Disk instance for serialization. Default Disk.
            **settings: Cache configuration options from DEFAULT_SETTINGS.
                       size_limit is automatically divided by shard count.
        """

    @property
    def directory(self):
        """Cache directory path."""

Cache Operations

All standard cache operations with automatic key distribution across shards.

def set(self, key, value, expire=None, read=False, tag=None, retry=False):
    """
    Store key-value pair in appropriate shard.
    
    Args:
        key: Cache key (must be hashable)
        value: Value to store
        expire (float, optional): Expiration time in seconds from now
        read (bool): Store value as file for reading. Default False.
        tag (str, optional): Tag for grouping related items
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        bool: True if set succeeded
    """

def get(self, key, default=None, read=False, expire_time=False, tag=False, retry=False):
    """
    Retrieve value by key from appropriate shard.
    
    Args:
        key: Cache key
        default: Default value if key not found
        read (bool): Return file handle instead of value. Default False.
        expire_time (bool): Include expiration time in result. Default False.
        tag (bool): Include tag in result. Default False.
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        Value, or tuple with additional info if expire_time/tag requested
    """

def delete(self, key, retry=False):
    """
    Delete key from appropriate shard.
    
    Args:
        key: Cache key to delete
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        bool: True if key existed and was deleted
    """

def add(self, key, value, expire=None, read=False, tag=None, retry=False):
    """
    Add key-value pair only if key doesn't exist in any shard.
    
    Args:
        key: Cache key
        value: Value to store
        expire (float, optional): Expiration time in seconds from now
        read (bool): Store value as file for reading. Default False.
        tag (str, optional): Tag for grouping related items
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        bool: True if key was added (didn't exist)
    """

def touch(self, key, expire=None, retry=False):
    """
    Update expiration time for existing key in appropriate shard.
    
    Args:
        key: Cache key
        expire (float, optional): New expiration time in seconds from now
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        bool: True if key existed and was touched
    """

def pop(self, key, default=None, expire_time=False, tag=False, retry=False):
    """
    Remove and return value for key from appropriate shard.
    
    Args:
        key: Cache key
        default: Default value if key not found
        expire_time (bool): Include expiration time in result. Default False.
        tag (bool): Include tag in result. Default False.
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        Value, or tuple with additional info if expire_time/tag requested
    """

def read(self, key):
    """
    Get file handle for key stored in read mode from appropriate shard.
    
    Args:
        key: Cache key
        
    Returns:
        File handle or None if key not found
    """

def incr(self, key, delta=1, default=0, retry=False):
    """
    Atomically increment numeric value in appropriate shard.
    
    Args:
        key: Cache key
        delta (int): Amount to increment. Default 1.
        default (int): Default value if key doesn't exist. Default 0.
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        New value after increment
    """

def decr(self, key, delta=1, default=0, retry=False):
    """
    Atomically decrement numeric value in appropriate shard.
    
    Args:
        key: Cache key
        delta (int): Amount to decrement. Default 1.
        default (int): Default value if key doesn't exist. Default 0.
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        New value after decrement
    """

Dict-like Interface

Familiar dictionary operations with sharding handled transparently.

def __setitem__(self, key, value):
    """Store key-value pair using fanout_cache[key] = value syntax."""

def __getitem__(self, key):
    """Retrieve value using fanout_cache[key] syntax. Raises KeyError if not found."""

def __delitem__(self, key):
    """Delete key using del fanout_cache[key] syntax."""

def __contains__(self, key):
    """Check if key exists using 'key in fanout_cache' syntax."""

def __len__(self):
    """Get total count of items across all shards."""

def __iter__(self):
    """Iterate over all cache keys across all shards."""

def __reversed__(self):
    """Reverse iterate over all cache keys across all shards."""

Cache Management

Management operations that work across all shards.

def clear(self, retry=False):
    """
    Remove all items from all shards.
    
    Args:
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        int: Total number of items removed across all shards
    """

def cull(self, retry=False):
    """
    Remove items according to eviction policy from all shards.
    
    Args:
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        int: Total number of items removed across all shards
    """

def expire(self, retry=False):
    """
    Remove expired items from all shards.
    
    Args:
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        int: Total number of expired items removed across all shards
    """

def evict(self, tag, retry=False):
    """
    Remove all items with specified tag from all shards.
    
    Args:
        tag (str): Tag to evict
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        int: Total number of items evicted across all shards
    """

def check(self, fix=False, retry=False):
    """
    Check database consistency across all shards.
    
    Args:
        fix (bool): Attempt to fix issues if found. Default False.
        retry (bool): Retry operation on timeout. Default False.
        
    Returns:
        List of issues found across all shards
    """

def create_tag_index(self):
    """Create database index on tag column for all shards."""

def drop_tag_index(self):
    """Drop database index on tag column for all shards."""

Statistics and Monitoring

Access aggregated statistics and information across all shards.

def stats(self, enable=True, reset=False):
    """
    Get aggregated cache hit/miss statistics across all shards.
    
    Args:
        enable (bool): Enable statistics tracking on all shards. Default True.
        reset (bool): Reset statistics counters on all shards. Default False.
        
    Returns:
        Tuple of (total_hits, total_misses) across all shards
    """

def volume(self):
    """
    Get total cache size on disk across all shards.
    
    Returns:
        int: Total size in bytes across all shards
    """

Transaction Management

Context management and connection handling across shards.

def transact(self, retry=True):
    """
    Context manager for atomic transactions across relevant shards.
    
    Args:
        retry (bool): Retry transaction on timeout. Default True.
        
    Returns:
        Context manager for transaction
    """

def __enter__(self):
    """Context manager entry - prepare for operations."""

def __exit__(self, *exception):
    """Context manager exit - cleanup resources."""

def close(self):
    """Close database connections and cleanup resources for all shards."""

Sub-collection Access

Create sub-collections (Cache, Deque, Index) within the FanoutCache directory structure.

def cache(self, name, timeout=60, disk=None, **settings):
    """
    Return Cache instance in subdirectory.
    
    Args:
        name (str): Subdirectory name for Cache
        timeout (float): SQLite connection timeout. Default 60.
        disk (Disk, optional): Disk instance. Default uses FanoutCache's disk.
        **settings: Cache configuration options
        
    Returns:
        Cache: Cache instance in subdirectory
    """

def deque(self, name, maxlen=None):
    """
    Return Deque instance in subdirectory.
    
    Args:
        name (str): Subdirectory name for Deque
        maxlen (int, optional): Maximum length of deque
        
    Returns:
        Deque: Deque instance in subdirectory
    """

def index(self, name):
    """
    Return Index instance in subdirectory.
    
    Args:
        name (str): Subdirectory name for Index
        
    Returns:
        Index: Index instance in subdirectory
    """

Advanced Operations

Settings management and serialization support for FanoutCache.

def reset(self, key, value=ENOVAL):
    """
    Reset cache setting value across all shards.
    
    Args:
        key (str): Setting key from DEFAULT_SETTINGS
        value: New value for setting
        
    Returns:
        Previous value of setting from first shard
    """

def __getstate__(self):
    """Support for pickle serialization - returns FanoutCache state."""

def __setstate__(self, state):
    """Support for pickle deserialization - restores FanoutCache state."""

Dynamic Settings Access

Access shard settings dynamically through attribute access.

def __getattr__(self, name):
    """
    Get setting value from first shard dynamically.
    
    Args:
        name (str): Setting name
        
    Returns:
        Setting value from first shard
        
    Raises:
        AttributeError: If setting name is not found
    """

Usage Examples

Basic Sharded Caching

import diskcache

# Create FanoutCache with 16 shards for higher throughput
fanout = diskcache.FanoutCache('/tmp/fanout_cache', shards=16)

# Basic operations - sharding is transparent
fanout.set('user:123', {'name': 'Bob', 'role': 'admin'})
user = fanout.get('user:123')

# Dict-like interface
fanout['config'] = {'debug': False, 'max_connections': 100}
config = fanout['config']

# Keys are automatically distributed across shards
for i in range(1000):
    fanout.set(f'item:{i}', f'value_{i}')

print(f"Total items: {len(fanout)}")  # Aggregated across all shards

High-Concurrency Usage

import threading
import diskcache

fanout = diskcache.FanoutCache('/tmp/high_throughput', shards=32)

def worker(thread_id):
    # Each thread can work with different keys simultaneously
    # without blocking due to sharding
    for i in range(1000):
        key = f'thread_{thread_id}_item_{i}'
        fanout.set(key, {'thread': thread_id, 'value': i})
        
# Create multiple threads for concurrent access
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Total items: {len(fanout)}")

Using Sub-collections

fanout = diskcache.FanoutCache('/tmp/collections')

# Create different data structures in subdirectories
user_cache = fanout.cache('users')
session_index = fanout.index('sessions')
task_queue = fanout.deque('tasks')

# Use each collection independently
user_cache.set('user:123', {'name': 'Alice'})
session_index['session_abc'] = {'user_id': 123, 'created': 1609459200}
task_queue.append({'task': 'send_email', 'user_id': 123})

# Collections share the same directory structure but operate independently
print(f"Users: {len(user_cache)}")
print(f"Sessions: {len(session_index)}")
print(f"Tasks: {len(task_queue)}")

Performance Monitoring

# Enable statistics across all shards
fanout.stats(enable=True)

# Perform operations
for i in range(10000):
    fanout.set(f'key_{i}', f'value_{i}')
    
for i in range(5000):
    value = fanout.get(f'key_{i}')  # Cache hits

for i in range(5000, 10000):
    value = fanout.get(f'missing_{i}')  # Cache misses

# Get aggregated statistics
hits, misses = fanout.stats()
print(f"Total hits: {hits}, misses: {misses}")
print(f"Hit ratio: {hits/(hits+misses):.2%}")
print(f"Total size: {fanout.volume()} bytes")

Cache Management

# Expire old items across all shards
expired_count = fanout.expire()
print(f"Expired {expired_count} items")

# Evict items by tag across all shards
fanout.set('temp1', 'data1', tag='temporary')
fanout.set('temp2', 'data2', tag='temporary')
evicted = fanout.evict('temporary')
print(f"Evicted {evicted} temporary items")

# Clear everything
total_cleared = fanout.clear()
print(f"Cleared {total_cleared} total items")

Install with Tessl CLI

npx tessl i tessl/pypi-diskcache

docs

core-caching.md

disk-serialization.md

django-integration.md

fanout-cache.md

index.md

persistent-data-structures.md

recipe-functions.md

synchronization-primitives.md

tile.json