Cross-platform file locking library that provides reliable file locking mechanisms across Windows, Linux, Unix, and macOS systems
—
Redis pubsub-based distributed locks that provide immediate unlocking when connections are lost, suitable for multi-process and multi-machine coordination. Unlike traditional Redis locking based on key/value pairs, this implementation uses the pubsub system for automatic cleanup when processes crash or connections are lost.
An extremely reliable Redis lock that automatically unlocks when the holding process disconnects, crashes, or loses network connectivity.
class RedisLock:
"""
Redis-based distributed lock using pubsub for automatic cleanup.
Parameters:
- channel: Redis channel to use as the locking key
- connection: Optional existing Redis connection
- timeout: Timeout when trying to acquire lock (default: inherited from LockBase)
- check_interval: Check interval while waiting (default: inherited from LockBase)
- fail_when_locked: Fail immediately if initial lock fails (default: False)
- thread_sleep_time: Sleep time between Redis message checks (default: 0.1)
- unavailable_timeout: Timeout for detecting unavailable lock holders (default: 1.0)
- redis_kwargs: Redis connection parameters if no connection provided
"""
def __init__(self, channel: str, connection: redis.Redis[str] | None = None,
timeout: float | None = None, check_interval: float | None = None,
fail_when_locked: bool | None = False, thread_sleep_time: float = 0.1,
unavailable_timeout: float = 1.0, redis_kwargs: dict[str, typing.Any] | None = None) -> None: ...
def acquire(self, timeout: float | None = None, check_interval: float | None = None,
fail_when_locked: bool | None = None) -> 'RedisLock':
"""
Acquire the Redis lock.
Parameters:
- timeout: Override default timeout
- check_interval: Override default check interval
- fail_when_locked: Override default fail_when_locked behavior
Returns:
- Self (RedisLock instance) for chaining
Raises:
- AlreadyLocked: If lock cannot be acquired
"""
def release(self) -> None:
"""Release the Redis lock and cleanup pubsub subscription"""
def get_connection(self) -> redis.Redis[str]:
"""Get or create Redis connection using provided redis_kwargs"""
def __enter__(self) -> 'RedisLock':
"""Context manager entry - acquire lock"""
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit - release lock"""RedisLock provides sensible defaults for Redis connection parameters:
DEFAULT_REDIS_KWARGS: dict[str, typing.Any] = {
'health_check_interval': 10, # Health check every 10 seconds
'decode_responses': True, # Automatically decode Redis responses
}Basic Redis lock usage:
import redis
import portalocker
# Simple Redis lock using default connection
with portalocker.RedisLock('my_resource_lock') as lock:
# Only one process across all machines can hold this lock
print("Processing shared resource...")
do_exclusive_work()
print("Work completed")
# Lock automatically released
# Custom Redis connection
redis_conn = redis.Redis(host='redis.example.com', port=6379, db=0)
with portalocker.RedisLock('my_resource_lock', connection=redis_conn) as lock:
process_shared_resource()Non-blocking Redis locks:
import portalocker
try:
# Fail immediately if lock is held by another process
with portalocker.RedisLock('my_resource_lock', fail_when_locked=True) as lock:
process_resource()
except portalocker.AlreadyLocked:
print("Resource is currently being processed by another instance")Timeout and retry behavior:
import portalocker
# Wait up to 30 seconds for lock to become available
lock = portalocker.RedisLock(
'my_resource_lock',
timeout=30.0,
check_interval=1.0, # Check every second
redis_kwargs={
'host': 'localhost',
'port': 6379,
'db': 0,
'health_check_interval': 5
}
)
try:
with lock:
# Will retry for up to 30 seconds
process_exclusive_resource()
except portalocker.AlreadyLocked:
print("Could not acquire lock within 30 seconds")Manual lock management:
import portalocker
# Create lock
lock = portalocker.RedisLock('batch_processing')
try:
# Acquire lock
lock.acquire(timeout=60.0)
# Do work
process_batch_job()
finally:
# Always release lock
lock.release()Multi-machine coordination:
import portalocker
import time
# Lock that works across multiple servers
def distributed_task():
with portalocker.RedisLock(
'daily_report_generation',
redis_kwargs={
'host': 'shared-redis.company.com',
'port': 6379,
'password': 'secret',
'db': 0
}
) as lock:
print("Starting daily report generation...")
# This will only run on one machine even if multiple
# servers try to run it simultaneously
generate_daily_reports()
print("Daily reports completed")
# Run on multiple servers - only one will actually execute
distributed_task()Custom Redis configuration:
import portalocker
# Advanced Redis configuration
custom_redis_config = {
'host': 'redis-cluster.example.com',
'port': 6379,
'db': 2,
'password': 'secure_password',
'socket_timeout': 5,
'socket_connect_timeout': 5,
'health_check_interval': 30,
'retry_on_timeout': True
}
with portalocker.RedisLock('critical_process', redis_kwargs=custom_redis_config) as lock:
# Process with custom Redis setup
handle_critical_process()The key advantage of RedisLock over traditional Redis locking mechanisms:
import portalocker
import os
def worker_process():
with portalocker.RedisLock('shared_work_queue') as lock:
# If this process crashes, gets killed, or loses network connection,
# the lock is automatically released immediately (not after timeout)
process_work_items()
# Even if worker_process() crashes or is killed with SIGKILL,
# other processes can immediately acquire the lock
worker_process()RedisLock raises the same base exceptions as other lock types:
import portalocker
import redis
try:
with portalocker.RedisLock('my_lock') as lock:
do_work()
except portalocker.AlreadyLocked:
print("Lock is held by another process")
except redis.ConnectionError:
print("Could not connect to Redis server")
except redis.TimeoutError:
print("Redis operation timed out")
except portalocker.LockException as e:
print(f"Locking error: {e}")RedisLock requires the redis Python package:
pip install portalocker[redis]
# or
pip install redisImport handling for missing redis dependency:
try:
from portalocker import RedisLock
except ImportError:
# Redis package not installed
RedisLock = None
if RedisLock is not None:
# Use Redis locking
with RedisLock('my_lock') as lock:
do_work()
else:
# Fallback to file-based locking
with portalocker.Lock('/tmp/my_lock') as lock:
do_work()import redis
import typing
# Redis connection type
RedisConnection = redis.Redis[str]
# Redis configuration dictionary
RedisKwargs = dict[str, typing.Any]
# Default Redis connection parameters
DEFAULT_REDIS_KWARGS: typing.ClassVar[dict[str, typing.Any]] = {
'health_check_interval': 10,
'decode_responses': True,
}Install with Tessl CLI
npx tessl i tessl/pypi-portalocker