CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-portalocker

Cross-platform file locking library that provides reliable file locking mechanisms across Windows, Linux, Unix, and macOS systems

Pending
Overview
Eval results
Files

redis-locking.mddocs/

Redis Distributed Locking

Redis pubsub-based distributed locks that provide immediate unlocking when connections are lost, suitable for multi-process and multi-machine coordination. Unlike traditional Redis locking based on key/value pairs, this implementation uses the pubsub system for automatic cleanup when processes crash or connections are lost.

Capabilities

RedisLock Class

An extremely reliable Redis lock that automatically unlocks when the holding process disconnects, crashes, or loses network connectivity.

class RedisLock:
    """
    Redis-based distributed lock using pubsub for automatic cleanup.
    
    Parameters:
    - channel: Redis channel to use as the locking key
    - connection: Optional existing Redis connection
    - timeout: Timeout when trying to acquire lock (default: inherited from LockBase)
    - check_interval: Check interval while waiting (default: inherited from LockBase)  
    - fail_when_locked: Fail immediately if initial lock fails (default: False)
    - thread_sleep_time: Sleep time between Redis message checks (default: 0.1)
    - unavailable_timeout: Timeout for detecting unavailable lock holders (default: 1.0)
    - redis_kwargs: Redis connection parameters if no connection provided
    """
    
    def __init__(self, channel: str, connection: redis.Redis[str] | None = None,
                 timeout: float | None = None, check_interval: float | None = None,
                 fail_when_locked: bool | None = False, thread_sleep_time: float = 0.1,
                 unavailable_timeout: float = 1.0, redis_kwargs: dict[str, typing.Any] | None = None) -> None: ...
    
    def acquire(self, timeout: float | None = None, check_interval: float | None = None,
                fail_when_locked: bool | None = None) -> 'RedisLock':
        """
        Acquire the Redis lock.
        
        Parameters:
        - timeout: Override default timeout
        - check_interval: Override default check interval
        - fail_when_locked: Override default fail_when_locked behavior
        
        Returns:
        - Self (RedisLock instance) for chaining
        
        Raises:
        - AlreadyLocked: If lock cannot be acquired
        """
    
    def release(self) -> None:
        """Release the Redis lock and cleanup pubsub subscription"""
    
    def get_connection(self) -> redis.Redis[str]:
        """Get or create Redis connection using provided redis_kwargs"""
    
    def __enter__(self) -> 'RedisLock':
        """Context manager entry - acquire lock"""
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Context manager exit - release lock"""

Default Redis Configuration

RedisLock provides sensible defaults for Redis connection parameters:

DEFAULT_REDIS_KWARGS: dict[str, typing.Any] = {
    'health_check_interval': 10,  # Health check every 10 seconds
    'decode_responses': True,     # Automatically decode Redis responses
}

Usage Examples

Basic Redis lock usage:

import redis
import portalocker

# Simple Redis lock using default connection
with portalocker.RedisLock('my_resource_lock') as lock:
    # Only one process across all machines can hold this lock
    print("Processing shared resource...")
    do_exclusive_work()
    print("Work completed")
# Lock automatically released

# Custom Redis connection
redis_conn = redis.Redis(host='redis.example.com', port=6379, db=0)
with portalocker.RedisLock('my_resource_lock', connection=redis_conn) as lock:
    process_shared_resource()

Non-blocking Redis locks:

import portalocker

try:
    # Fail immediately if lock is held by another process
    with portalocker.RedisLock('my_resource_lock', fail_when_locked=True) as lock:
        process_resource()
except portalocker.AlreadyLocked:
    print("Resource is currently being processed by another instance")

Timeout and retry behavior:

import portalocker

# Wait up to 30 seconds for lock to become available
lock = portalocker.RedisLock(
    'my_resource_lock',
    timeout=30.0,
    check_interval=1.0,  # Check every second
    redis_kwargs={
        'host': 'localhost',
        'port': 6379,
        'db': 0,
        'health_check_interval': 5
    }
)

try:
    with lock:
        # Will retry for up to 30 seconds
        process_exclusive_resource()
except portalocker.AlreadyLocked:
    print("Could not acquire lock within 30 seconds")

Manual lock management:

import portalocker

# Create lock
lock = portalocker.RedisLock('batch_processing')

try:
    # Acquire lock
    lock.acquire(timeout=60.0)
    
    # Do work
    process_batch_job()
    
finally:
    # Always release lock
    lock.release()

Multi-machine coordination:

import portalocker
import time

# Lock that works across multiple servers
def distributed_task():
    with portalocker.RedisLock(
        'daily_report_generation',
        redis_kwargs={
            'host': 'shared-redis.company.com',
            'port': 6379,
            'password': 'secret',
            'db': 0
        }
    ) as lock:
        print("Starting daily report generation...")
        
        # This will only run on one machine even if multiple
        # servers try to run it simultaneously
        generate_daily_reports()
        
        print("Daily reports completed")

# Run on multiple servers - only one will actually execute
distributed_task()

Custom Redis configuration:

import portalocker

# Advanced Redis configuration
custom_redis_config = {
    'host': 'redis-cluster.example.com',
    'port': 6379,
    'db': 2,
    'password': 'secure_password',
    'socket_timeout': 5,
    'socket_connect_timeout': 5,
    'health_check_interval': 30,
    'retry_on_timeout': True
}

with portalocker.RedisLock('critical_process', redis_kwargs=custom_redis_config) as lock:
    # Process with custom Redis setup
    handle_critical_process()

Automatic Connection Cleanup

The key advantage of RedisLock over traditional Redis locking mechanisms:

import portalocker
import os

def worker_process():
    with portalocker.RedisLock('shared_work_queue') as lock:
        # If this process crashes, gets killed, or loses network connection,
        # the lock is automatically released immediately (not after timeout)
        process_work_items()

# Even if worker_process() crashes or is killed with SIGKILL,
# other processes can immediately acquire the lock
worker_process()

Error Handling

RedisLock raises the same base exceptions as other lock types:

import portalocker
import redis

try:
    with portalocker.RedisLock('my_lock') as lock:
        do_work()
except portalocker.AlreadyLocked:
    print("Lock is held by another process")
except redis.ConnectionError:
    print("Could not connect to Redis server")  
except redis.TimeoutError:
    print("Redis operation timed out")
except portalocker.LockException as e:
    print(f"Locking error: {e}")

Requirements

RedisLock requires the redis Python package:

pip install portalocker[redis]
# or
pip install redis

Import handling for missing redis dependency:

try:
    from portalocker import RedisLock
except ImportError:
    # Redis package not installed
    RedisLock = None
    
if RedisLock is not None:
    # Use Redis locking
    with RedisLock('my_lock') as lock:
        do_work()
else:
    # Fallback to file-based locking
    with portalocker.Lock('/tmp/my_lock') as lock:
        do_work()

Type Definitions

import redis
import typing

# Redis connection type
RedisConnection = redis.Redis[str]

# Redis configuration dictionary
RedisKwargs = dict[str, typing.Any]

# Default Redis connection parameters
DEFAULT_REDIS_KWARGS: typing.ClassVar[dict[str, typing.Any]] = {
    'health_check_interval': 10,
    'decode_responses': True,
}

Install with Tessl CLI

npx tessl i tessl/pypi-portalocker

docs

file-locking.md

index.md

lock-classes.md

redis-locking.md

semaphores.md

utilities.md

tile.json