Disk Cache -- Disk and file backed persistent cache.
—
DiskCache provides thread-safe and process-safe synchronization primitives that work across multiple processes and machines. These include locks, re-entrant locks, bounded semaphores, and a running average calculator, all implemented using the underlying cache for coordination.
A process-safe lock implementation using spin-lock algorithm with cache-based coordination.
class Lock:
def __init__(self, cache, key, expire=None, tag=None):
"""
Initialize distributed lock.
Args:
cache (Cache or FanoutCache): Cache instance for coordination
key (str): Unique key for the lock
expire (float, optional): Lock expiration time in seconds
tag (str, optional): Tag for grouping related locks
"""
def acquire(self):
"""
Acquire the lock using spin-lock algorithm.
Blocks until lock is acquired. Uses polling with small delays
to detect when lock becomes available.
Returns:
bool: True when lock is acquired
"""
def release(self):
"""
Release the lock by deleting the lock key.
Returns:
bool: True if lock was held and released
"""
def locked(self):
"""
Check if lock is currently held (by any process).
Returns:
bool: True if lock is currently held
"""
def __enter__(self):
"""Context manager entry - acquire lock."""
def __exit__(self, *exc_info):
"""Context manager exit - release lock."""A re-entrant lock that can be acquired multiple times by the same process/thread, with a counter to track acquisition depth.
class RLock:
def __init__(self, cache, key, expire=None, tag=None):
"""
Initialize re-entrant distributed lock.
Args:
cache (Cache or FanoutCache): Cache instance for coordination
key (str): Unique key for the lock
expire (float, optional): Lock expiration time in seconds
tag (str, optional): Tag for grouping related locks
"""
def acquire(self):
"""
Acquire the re-entrant lock.
If already held by the current process, increments the acquisition
count. Blocks if held by another process.
Returns:
bool: True when lock is acquired
"""
def release(self):
"""
Release the re-entrant lock.
Decrements the acquisition count. Only fully releases the lock
when count reaches zero.
Returns:
bool: True if lock count was decremented
Raises:
RuntimeError: If attempting to release a lock not held by current process
"""
def __enter__(self):
"""Context manager entry - acquire lock."""
def __exit__(self, *exc_info):
"""Context manager exit - release lock."""A distributed semaphore that limits the number of concurrent accesses to a resource across processes.
class BoundedSemaphore:
def __init__(self, cache, key, value=1, expire=None, tag=None):
"""
Initialize bounded semaphore.
Args:
cache (Cache or FanoutCache): Cache instance for coordination
key (str): Unique key for the semaphore
value (int): Initial semaphore count. Default 1.
expire (float, optional): Semaphore expiration time in seconds
tag (str, optional): Tag for grouping related semaphores
"""
def acquire(self):
"""
Acquire the semaphore (decrement count).
Blocks until semaphore count is greater than zero, then
atomically decrements the count.
Returns:
bool: True when semaphore is acquired
"""
def release(self):
"""
Release the semaphore (increment count).
Atomically increments the semaphore count, potentially
allowing other processes to acquire it.
Returns:
bool: True if semaphore was released
Raises:
ValueError: If attempting to release beyond initial value
"""
def __enter__(self):
"""Context manager entry - acquire semaphore."""
def __exit__(self, *exc_info):
"""Context manager exit - release semaphore."""A distributed running average calculator that maintains total and count across processes.
class Averager:
def __init__(self, cache, key, expire=None, tag=None):
"""
Initialize running average calculator.
Args:
cache (Cache or FanoutCache): Cache instance for storage
key (str): Unique key for the average data
expire (float, optional): Data expiration time in seconds
tag (str, optional): Tag for grouping related averages
"""
def add(self, value):
"""
Add value to the running average.
Atomically updates both the total sum and count,
maintaining consistency across concurrent operations.
Args:
value (float): Value to add to average
"""
def get(self):
"""
Get current average value.
Returns:
float: Current average (total/count), or None if no values added
"""
def pop(self):
"""
Get current average and delete the data.
Returns:
float: Current average (total/count), or None if no values added
"""import diskcache
import threading
import time
cache = diskcache.FanoutCache('/tmp/locks')
lock = diskcache.Lock(cache, 'resource_lock')
def worker(worker_id):
print(f"Worker {worker_id} trying to acquire lock...")
with lock: # Context manager automatically acquires and releases
print(f"Worker {worker_id} has the lock")
time.sleep(2) # Simulate work
print(f"Worker {worker_id} releasing lock")
# Create multiple threads competing for the same lock
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()import diskcache
cache = diskcache.Cache('/tmp/manual_locks')
lock = diskcache.Lock(cache, 'manual_lock', expire=30) # Expires in 30 seconds
# Manual lock operations
if not lock.locked():
lock.acquire()
try:
print("Performing critical section work...")
# Do critical work
finally:
lock.release()
else:
print("Resource is busy")import diskcache
cache = diskcache.Cache('/tmp/rlocks')
rlock = diskcache.RLock(cache, 'reentrant_lock')
def recursive_function(depth):
if depth <= 0:
return
with rlock: # Can acquire the same lock multiple times
print(f"Acquired lock at depth {depth}")
recursive_function(depth - 1) # Recursive call - will re-acquire same lock
print(f"Released lock at depth {depth}")
recursive_function(3)import diskcache
import threading
import time
cache = diskcache.Cache('/tmp/semaphores')
# Allow maximum 3 concurrent connections
semaphore = diskcache.BoundedSemaphore(cache, 'db_connections', value=3)
def database_worker(worker_id):
print(f"Worker {worker_id} requesting database connection...")
with semaphore: # Only 3 workers can be here simultaneously
print(f"Worker {worker_id} connected to database")
time.sleep(2) # Simulate database work
print(f"Worker {worker_id} disconnecting from database")
# Create 10 workers, but only 3 can access database at once
threads = []
for i in range(10):
t = threading.Thread(target=database_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()import diskcache
import random
import threading
cache = diskcache.Cache('/tmp/averages')
avg = diskcache.Averager(cache, 'response_time_avg')
def simulate_requests(worker_id):
for i in range(10):
# Simulate request with random response time
response_time = random.uniform(0.1, 2.0)
avg.add(response_time)
print(f"Worker {worker_id} added {response_time:.3f}s")
# Multiple workers adding response times concurrently
threads = []
for i in range(5):
t = threading.Thread(target=simulate_requests, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# Get final average
final_avg = avg.get()
print(f"Average response time: {final_avg:.3f}s")
# Reset for next measurement period
final_avg = avg.pop() # Gets average and clears data
print(f"Final average before reset: {final_avg:.3f}s")
print(f"After reset: {avg.get()}") # Should be Noneimport diskcache
import multiprocessing
import time
def worker_process(process_id, cache_dir):
# Each process creates its own cache connection
cache = diskcache.Cache(cache_dir)
lock = diskcache.Lock(cache, 'shared_resource')
semaphore = diskcache.BoundedSemaphore(cache, 'resource_pool', value=2)
# Coordinate access across processes
with semaphore:
print(f"Process {process_id} acquired semaphore")
with lock:
print(f"Process {process_id} has exclusive access")
time.sleep(1)
print(f"Process {process_id} releasing exclusive access")
print(f"Process {process_id} released semaphore")
if __name__ == '__main__':
cache_dir = '/tmp/multiprocess_sync'
# Create multiple processes
processes = []
for i in range(6):
p = multiprocessing.Process(target=worker_process, args=(i, cache_dir))
processes.append(p)
p.start()
for p in processes:
p.join()import diskcache
import time
import threading
cache = diskcache.FanoutCache('/tmp/advanced_sync')
# Reader-writer lock pattern using multiple locks
read_lock = diskcache.Lock(cache, 'read_lock')
write_lock = diskcache.Lock(cache, 'write_lock')
reader_count = diskcache.Averager(cache, 'reader_count')
def reader(reader_id):
with read_lock:
reader_count.add(1) # Increment reader count
if reader_count.get() == 1: # First reader
write_lock.acquire() # Block writers
# Reading critical section
print(f"Reader {reader_id} is reading...")
time.sleep(1)
with read_lock:
current_count = reader_count.get() or 0
if current_count <= 1: # Last reader
reader_count.pop() # Reset count
write_lock.release() # Allow writers
else:
# Decrement count (simplified - in practice need atomic decrement)
pass
def writer(writer_id):
with write_lock:
print(f"Writer {writer_id} is writing...")
time.sleep(2)
# Create readers and writers
threads = []
for i in range(3):
threads.append(threading.Thread(target=reader, args=(i,)))
for i in range(2):
threads.append(threading.Thread(target=writer, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()Always set reasonable expiration times to prevent deadlocks from crashed processes:
# Lock expires after 60 seconds to prevent deadlocks
lock = diskcache.Lock(cache, 'critical_resource', expire=60)Properly handle lock acquisition failures and cleanup:
import diskcache
cache = diskcache.Cache('/tmp/safe_locks')
lock = diskcache.Lock(cache, 'safe_lock', expire=30)
try:
if lock.acquire():
try:
# Critical section
critical_work()
finally:
lock.release()
else:
print("Could not acquire lock")
except Exception as e:
print(f"Error in critical section: {e}")
# Lock will expire automatically due to expire parameterUse semaphores to limit resource usage:
# Limit concurrent file downloads
download_semaphore = diskcache.BoundedSemaphore(
cache, 'downloads', value=5, expire=300
)
def download_file(url):
with download_semaphore:
# Only 5 downloads can happen simultaneously
perform_download(url)Install with Tessl CLI
npx tessl i tessl/pypi-diskcache