CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-diskcache

Disk Cache -- Disk and file backed persistent cache.

Pending
Overview
Eval results
Files

synchronization-primitives.mddocs/

Synchronization Primitives

DiskCache provides thread-safe and process-safe synchronization primitives that work across multiple processes and machines. These include locks, re-entrant locks, bounded semaphores, and a running average calculator, all implemented using the underlying cache for coordination.

Capabilities

Lock - Distributed Lock

A process-safe lock implementation using spin-lock algorithm with cache-based coordination.

class Lock:
    def __init__(self, cache, key, expire=None, tag=None):
        """
        Initialize distributed lock.
        
        Args:
            cache (Cache or FanoutCache): Cache instance for coordination
            key (str): Unique key for the lock
            expire (float, optional): Lock expiration time in seconds
            tag (str, optional): Tag for grouping related locks
        """
        
    def acquire(self):
        """
        Acquire the lock using spin-lock algorithm.
        
        Blocks until lock is acquired. Uses polling with small delays
        to detect when lock becomes available.
        
        Returns:
            bool: True when lock is acquired
        """
        
    def release(self):
        """
        Release the lock by deleting the lock key.
        
        Returns:
            bool: True if lock was held and released
        """
        
    def locked(self):
        """
        Check if lock is currently held (by any process).
        
        Returns:
            bool: True if lock is currently held
        """
        
    def __enter__(self):
        """Context manager entry - acquire lock."""
        
    def __exit__(self, *exc_info):
        """Context manager exit - release lock."""

RLock - Re-entrant Distributed Lock

A re-entrant lock that can be acquired multiple times by the same process/thread, with a counter to track acquisition depth.

class RLock:
    def __init__(self, cache, key, expire=None, tag=None):
        """
        Initialize re-entrant distributed lock.
        
        Args:
            cache (Cache or FanoutCache): Cache instance for coordination
            key (str): Unique key for the lock
            expire (float, optional): Lock expiration time in seconds
            tag (str, optional): Tag for grouping related locks
        """
        
    def acquire(self):
        """
        Acquire the re-entrant lock.
        
        If already held by the current process, increments the acquisition
        count. Blocks if held by another process.
        
        Returns:
            bool: True when lock is acquired
        """
        
    def release(self):
        """
        Release the re-entrant lock.
        
        Decrements the acquisition count. Only fully releases the lock
        when count reaches zero.
        
        Returns:
            bool: True if lock count was decremented
            
        Raises:
            RuntimeError: If attempting to release a lock not held by current process
        """
        
    def __enter__(self):
        """Context manager entry - acquire lock."""
        
    def __exit__(self, *exc_info):
        """Context manager exit - release lock."""

BoundedSemaphore - Distributed Semaphore

A distributed semaphore that limits the number of concurrent accesses to a resource across processes.

class BoundedSemaphore:
    def __init__(self, cache, key, value=1, expire=None, tag=None):
        """
        Initialize bounded semaphore.
        
        Args:
            cache (Cache or FanoutCache): Cache instance for coordination
            key (str): Unique key for the semaphore
            value (int): Initial semaphore count. Default 1.
            expire (float, optional): Semaphore expiration time in seconds
            tag (str, optional): Tag for grouping related semaphores
        """
        
    def acquire(self):
        """
        Acquire the semaphore (decrement count).
        
        Blocks until semaphore count is greater than zero, then
        atomically decrements the count.
        
        Returns:
            bool: True when semaphore is acquired
        """
        
    def release(self):
        """
        Release the semaphore (increment count).
        
        Atomically increments the semaphore count, potentially
        allowing other processes to acquire it.
        
        Returns:
            bool: True if semaphore was released
            
        Raises:
            ValueError: If attempting to release beyond initial value
        """
        
    def __enter__(self):
        """Context manager entry - acquire semaphore."""
        
    def __exit__(self, *exc_info):
        """Context manager exit - release semaphore."""

Averager - Running Average Calculator

A distributed running average calculator that maintains total and count across processes.

class Averager:
    def __init__(self, cache, key, expire=None, tag=None):
        """
        Initialize running average calculator.
        
        Args:
            cache (Cache or FanoutCache): Cache instance for storage
            key (str): Unique key for the average data
            expire (float, optional): Data expiration time in seconds
            tag (str, optional): Tag for grouping related averages
        """
        
    def add(self, value):
        """
        Add value to the running average.
        
        Atomically updates both the total sum and count,
        maintaining consistency across concurrent operations.
        
        Args:
            value (float): Value to add to average
        """
        
    def get(self):
        """
        Get current average value.
        
        Returns:
            float: Current average (total/count), or None if no values added
        """
        
    def pop(self):
        """
        Get current average and delete the data.
        
        Returns:
            float: Current average (total/count), or None if no values added
        """

Usage Examples

Basic Lock Usage

import diskcache
import threading
import time

cache = diskcache.FanoutCache('/tmp/locks')
lock = diskcache.Lock(cache, 'resource_lock')

def worker(worker_id):
    print(f"Worker {worker_id} trying to acquire lock...")
    
    with lock:  # Context manager automatically acquires and releases
        print(f"Worker {worker_id} has the lock")
        time.sleep(2)  # Simulate work
        print(f"Worker {worker_id} releasing lock")

# Create multiple threads competing for the same lock
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Manual Lock Operations

import diskcache

cache = diskcache.Cache('/tmp/manual_locks')
lock = diskcache.Lock(cache, 'manual_lock', expire=30)  # Expires in 30 seconds

# Manual lock operations
if not lock.locked():
    lock.acquire()
    try:
        print("Performing critical section work...")
        # Do critical work
    finally:
        lock.release()
else:
    print("Resource is busy")

Re-entrant Lock Usage

import diskcache

cache = diskcache.Cache('/tmp/rlocks')
rlock = diskcache.RLock(cache, 'reentrant_lock')

def recursive_function(depth):
    if depth <= 0:
        return
        
    with rlock:  # Can acquire the same lock multiple times
        print(f"Acquired lock at depth {depth}")
        recursive_function(depth - 1)  # Recursive call - will re-acquire same lock
        print(f"Released lock at depth {depth}")

recursive_function(3)

Bounded Semaphore Usage

import diskcache
import threading
import time

cache = diskcache.Cache('/tmp/semaphores')
# Allow maximum 3 concurrent connections
semaphore = diskcache.BoundedSemaphore(cache, 'db_connections', value=3)

def database_worker(worker_id):
    print(f"Worker {worker_id} requesting database connection...")
    
    with semaphore:  # Only 3 workers can be here simultaneously
        print(f"Worker {worker_id} connected to database")
        time.sleep(2)  # Simulate database work
        print(f"Worker {worker_id} disconnecting from database")

# Create 10 workers, but only 3 can access database at once
threads = []
for i in range(10):
    t = threading.Thread(target=database_worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Running Average Usage

import diskcache
import random
import threading

cache = diskcache.Cache('/tmp/averages')
avg = diskcache.Averager(cache, 'response_time_avg')

def simulate_requests(worker_id):
    for i in range(10):
        # Simulate request with random response time
        response_time = random.uniform(0.1, 2.0)
        avg.add(response_time)
        print(f"Worker {worker_id} added {response_time:.3f}s")

# Multiple workers adding response times concurrently  
threads = []
for i in range(5):
    t = threading.Thread(target=simulate_requests, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# Get final average
final_avg = avg.get()
print(f"Average response time: {final_avg:.3f}s")

# Reset for next measurement period
final_avg = avg.pop()  # Gets average and clears data
print(f"Final average before reset: {final_avg:.3f}s")
print(f"After reset: {avg.get()}")  # Should be None

Cross-Process Coordination

import diskcache
import multiprocessing
import time

def worker_process(process_id, cache_dir):
    # Each process creates its own cache connection
    cache = diskcache.Cache(cache_dir)
    lock = diskcache.Lock(cache, 'shared_resource')
    semaphore = diskcache.BoundedSemaphore(cache, 'resource_pool', value=2)
    
    # Coordinate access across processes
    with semaphore:
        print(f"Process {process_id} acquired semaphore")
        
        with lock:
            print(f"Process {process_id} has exclusive access")
            time.sleep(1)
            print(f"Process {process_id} releasing exclusive access")
            
        print(f"Process {process_id} released semaphore")

if __name__ == '__main__':
    cache_dir = '/tmp/multiprocess_sync'
    
    # Create multiple processes
    processes = []
    for i in range(6):
        p = multiprocessing.Process(target=worker_process, args=(i, cache_dir))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

Advanced Patterns

import diskcache
import time
import threading

cache = diskcache.FanoutCache('/tmp/advanced_sync')

# Reader-writer lock pattern using multiple locks
read_lock = diskcache.Lock(cache, 'read_lock')
write_lock = diskcache.Lock(cache, 'write_lock')
reader_count = diskcache.Averager(cache, 'reader_count')

def reader(reader_id):
    with read_lock:
        reader_count.add(1)  # Increment reader count
        if reader_count.get() == 1:  # First reader
            write_lock.acquire()  # Block writers
    
    # Reading critical section
    print(f"Reader {reader_id} is reading...")
    time.sleep(1)
    
    with read_lock:
        current_count = reader_count.get() or 0
        if current_count <= 1:  # Last reader
            reader_count.pop()  # Reset count
            write_lock.release()  # Allow writers
        else:
            # Decrement count (simplified - in practice need atomic decrement)
            pass

def writer(writer_id):
    with write_lock:
        print(f"Writer {writer_id} is writing...")
        time.sleep(2)

# Create readers and writers
threads = []
for i in range(3):
    threads.append(threading.Thread(target=reader, args=(i,)))
for i in range(2):
    threads.append(threading.Thread(target=writer, args=(i,)))

for t in threads:
    t.start()
for t in threads:
    t.join()

Best Practices

Lock Expiration

Always set reasonable expiration times to prevent deadlocks from crashed processes:

# Lock expires after 60 seconds to prevent deadlocks
lock = diskcache.Lock(cache, 'critical_resource', expire=60)

Error Handling

Properly handle lock acquisition failures and cleanup:

import diskcache

cache = diskcache.Cache('/tmp/safe_locks')
lock = diskcache.Lock(cache, 'safe_lock', expire=30)

try:
    if lock.acquire():
        try:
            # Critical section
            critical_work()
        finally:
            lock.release()
    else:
        print("Could not acquire lock")
except Exception as e:
    print(f"Error in critical section: {e}")
    # Lock will expire automatically due to expire parameter

Semaphore Resource Management

Use semaphores to limit resource usage:

# Limit concurrent file downloads
download_semaphore = diskcache.BoundedSemaphore(
    cache, 'downloads', value=5, expire=300
)

def download_file(url):
    with download_semaphore:
        # Only 5 downloads can happen simultaneously
        perform_download(url)

Install with Tessl CLI

npx tessl i tessl/pypi-diskcache

docs

core-caching.md

disk-serialization.md

django-integration.md

fanout-cache.md

index.md

persistent-data-structures.md

recipe-functions.md

synchronization-primitives.md

tile.json