0
# Redis Distributed Locking
1
2
Redis pubsub-based distributed locks that provide immediate unlocking when connections are lost, suitable for multi-process and multi-machine coordination. Unlike traditional Redis locking based on key/value pairs, this implementation uses the pubsub system for automatic cleanup when processes crash or connections are lost.
3
4
## Capabilities
5
6
### RedisLock Class
7
8
An extremely reliable Redis lock that automatically unlocks when the holding process disconnects, crashes, or loses network connectivity.
9
10
```python { .api }
11
class RedisLock:
12
"""
13
Redis-based distributed lock using pubsub for automatic cleanup.
14
15
Parameters:
16
- channel: Redis channel to use as the locking key
17
- connection: Optional existing Redis connection
18
- timeout: Timeout when trying to acquire lock (default: inherited from LockBase)
19
- check_interval: Check interval while waiting (default: inherited from LockBase)
20
- fail_when_locked: Fail immediately if initial lock fails (default: False)
21
- thread_sleep_time: Sleep time between Redis message checks (default: 0.1)
22
- unavailable_timeout: Timeout for detecting unavailable lock holders (default: 1.0)
23
- redis_kwargs: Redis connection parameters if no connection provided
24
"""
25
26
def __init__(self, channel: str, connection: redis.Redis[str] | None = None,
27
timeout: float | None = None, check_interval: float | None = None,
28
fail_when_locked: bool | None = False, thread_sleep_time: float = 0.1,
29
unavailable_timeout: float = 1.0, redis_kwargs: dict[str, typing.Any] | None = None) -> None: ...
30
31
def acquire(self, timeout: float | None = None, check_interval: float | None = None,
32
fail_when_locked: bool | None = None) -> 'RedisLock':
33
"""
34
Acquire the Redis lock.
35
36
Parameters:
37
- timeout: Override default timeout
38
- check_interval: Override default check interval
39
- fail_when_locked: Override default fail_when_locked behavior
40
41
Returns:
42
- Self (RedisLock instance) for chaining
43
44
Raises:
45
- AlreadyLocked: If lock cannot be acquired
46
"""
47
48
def release(self) -> None:
49
"""Release the Redis lock and cleanup pubsub subscription"""
50
51
def get_connection(self) -> redis.Redis[str]:
52
"""Get or create Redis connection using provided redis_kwargs"""
53
54
def __enter__(self) -> 'RedisLock':
55
"""Context manager entry - acquire lock"""
56
57
def __exit__(self, exc_type, exc_value, traceback) -> None:
58
"""Context manager exit - release lock"""
59
```
60
61
### Default Redis Configuration
62
63
RedisLock provides sensible defaults for Redis connection parameters:
64
65
```python { .api }
66
DEFAULT_REDIS_KWARGS: dict[str, typing.Any] = {
67
'health_check_interval': 10, # Health check every 10 seconds
68
'decode_responses': True, # Automatically decode Redis responses
69
}
70
```
71
72
### Usage Examples
73
74
Basic Redis lock usage:
75
76
```python
77
import redis
78
import portalocker
79
80
# Simple Redis lock using default connection
81
with portalocker.RedisLock('my_resource_lock') as lock:
82
# Only one process across all machines can hold this lock
83
print("Processing shared resource...")
84
do_exclusive_work()
85
print("Work completed")
86
# Lock automatically released
87
88
# Custom Redis connection
89
redis_conn = redis.Redis(host='redis.example.com', port=6379, db=0)
90
with portalocker.RedisLock('my_resource_lock', connection=redis_conn) as lock:
91
process_shared_resource()
92
```
93
94
Non-blocking Redis locks:
95
96
```python
97
import portalocker
98
99
try:
100
# Fail immediately if lock is held by another process
101
with portalocker.RedisLock('my_resource_lock', fail_when_locked=True) as lock:
102
process_resource()
103
except portalocker.AlreadyLocked:
104
print("Resource is currently being processed by another instance")
105
```
106
107
Timeout and retry behavior:
108
109
```python
110
import portalocker
111
112
# Wait up to 30 seconds for lock to become available
113
lock = portalocker.RedisLock(
114
'my_resource_lock',
115
timeout=30.0,
116
check_interval=1.0, # Check every second
117
redis_kwargs={
118
'host': 'localhost',
119
'port': 6379,
120
'db': 0,
121
'health_check_interval': 5
122
}
123
)
124
125
try:
126
with lock:
127
# Will retry for up to 30 seconds
128
process_exclusive_resource()
129
except portalocker.AlreadyLocked:
130
print("Could not acquire lock within 30 seconds")
131
```
132
133
Manual lock management:
134
135
```python
136
import portalocker
137
138
# Create lock
139
lock = portalocker.RedisLock('batch_processing')
140
141
try:
142
# Acquire lock
143
lock.acquire(timeout=60.0)
144
145
# Do work
146
process_batch_job()
147
148
finally:
149
# Always release lock
150
lock.release()
151
```
152
153
Multi-machine coordination:
154
155
```python
156
import portalocker
157
import time
158
159
# Lock that works across multiple servers
160
def distributed_task():
161
with portalocker.RedisLock(
162
'daily_report_generation',
163
redis_kwargs={
164
'host': 'shared-redis.company.com',
165
'port': 6379,
166
'password': 'secret',
167
'db': 0
168
}
169
) as lock:
170
print("Starting daily report generation...")
171
172
# This will only run on one machine even if multiple
173
# servers try to run it simultaneously
174
generate_daily_reports()
175
176
print("Daily reports completed")
177
178
# Run on multiple servers - only one will actually execute
179
distributed_task()
180
```
181
182
Custom Redis configuration:
183
184
```python
185
import portalocker
186
187
# Advanced Redis configuration
188
custom_redis_config = {
189
'host': 'redis-cluster.example.com',
190
'port': 6379,
191
'db': 2,
192
'password': 'secure_password',
193
'socket_timeout': 5,
194
'socket_connect_timeout': 5,
195
'health_check_interval': 30,
196
'retry_on_timeout': True
197
}
198
199
with portalocker.RedisLock('critical_process', redis_kwargs=custom_redis_config) as lock:
200
# Process with custom Redis setup
201
handle_critical_process()
202
```
203
204
## Automatic Connection Cleanup
205
206
The key advantage of RedisLock over traditional Redis locking mechanisms:
207
208
```python
209
import portalocker
210
import os
211
212
def worker_process():
213
with portalocker.RedisLock('shared_work_queue') as lock:
214
# If this process crashes, gets killed, or loses network connection,
215
# the lock is automatically released immediately (not after timeout)
216
process_work_items()
217
218
# Even if worker_process() crashes or is killed with SIGKILL,
219
# other processes can immediately acquire the lock
220
worker_process()
221
```
222
223
## Error Handling
224
225
RedisLock raises the same base exceptions as other lock types:
226
227
```python
228
import portalocker
229
import redis
230
231
try:
232
with portalocker.RedisLock('my_lock') as lock:
233
do_work()
234
except portalocker.AlreadyLocked:
235
print("Lock is held by another process")
236
except redis.ConnectionError:
237
print("Could not connect to Redis server")
238
except redis.TimeoutError:
239
print("Redis operation timed out")
240
except portalocker.LockException as e:
241
print(f"Locking error: {e}")
242
```
243
244
## Requirements
245
246
RedisLock requires the redis Python package:
247
248
```bash
249
pip install portalocker[redis]
250
# or
251
pip install redis
252
```
253
254
Import handling for missing redis dependency:
255
256
```python
257
try:
258
from portalocker import RedisLock
259
except ImportError:
260
# Redis package not installed
261
RedisLock = None
262
263
if RedisLock is not None:
264
# Use Redis locking
265
with RedisLock('my_lock') as lock:
266
do_work()
267
else:
268
# Fallback to file-based locking
269
with portalocker.Lock('/tmp/my_lock') as lock:
270
do_work()
271
```
272
273
## Type Definitions
274
275
```python { .api }
276
import redis
277
import typing
278
279
# Redis connection type
280
RedisConnection = redis.Redis[str]
281
282
# Redis configuration dictionary
283
RedisKwargs = dict[str, typing.Any]
284
285
# Default Redis connection parameters
286
DEFAULT_REDIS_KWARGS: typing.ClassVar[dict[str, typing.Any]] = {
287
'health_check_interval': 10,
288
'decode_responses': True,
289
}
290
```