0
# Distributed Locking
1
2
Redis distributed locks provide mutual exclusion across multiple processes or servers using Redis as a coordination service. The Lock implementation uses SET with NX and EX options for atomic lock acquisition with automatic expiration.
3
4
## Capabilities
5
6
### Lock Operations
7
8
Redis Lock class for creating and managing distributed locks with automatic expiration.
9
10
```python { .api }
11
def lock(
12
self,
13
name: str,
14
timeout: Optional[float] = None,
15
sleep: float = 0.1,
16
blocking: bool = True,
17
blocking_timeout: Optional[float] = None,
18
thread_local: bool = True
19
) -> "Lock": ...
20
21
class Lock:
22
def __init__(
23
self,
24
redis: Redis,
25
name: str,
26
timeout: Optional[float] = None,
27
sleep: float = 0.1,
28
blocking: bool = True,
29
blocking_timeout: Optional[float] = None,
30
thread_local: bool = True
31
): ...
32
33
def acquire(
34
self,
35
blocking: Optional[bool] = None,
36
blocking_timeout: Optional[float] = None,
37
token: Optional[bytes] = None
38
) -> bool: ...
39
40
def release(self) -> None: ...
41
42
def extend(self, additional_time: float, replace_ttl: bool = False) -> bool: ...
43
44
def locked(self) -> bool: ...
45
46
def owned(self) -> bool: ...
47
48
def reacquire(self) -> bool: ...
49
50
def __enter__(self) -> "Lock": ...
51
52
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
53
54
@property
55
def token(self) -> Optional[bytes]: ...
56
```
57
58
## Usage Examples
59
60
### Basic Lock Usage
61
62
```python
63
import redis
64
import time
65
import threading
66
67
r = redis.Redis(host='localhost', port=6379, db=0)
68
69
def worker(worker_id):
70
"""Worker function that acquires a lock before processing"""
71
lock = r.lock("shared_resource", timeout=10)
72
73
try:
74
# Try to acquire the lock
75
if lock.acquire(blocking=False):
76
print(f"Worker {worker_id}: Acquired lock")
77
78
# Simulate work with shared resource
79
print(f"Worker {worker_id}: Processing shared resource...")
80
time.sleep(2)
81
82
print(f"Worker {worker_id}: Work completed")
83
else:
84
print(f"Worker {worker_id}: Could not acquire lock")
85
finally:
86
# Always try to release the lock
87
if lock.owned():
88
lock.release()
89
print(f"Worker {worker_id}: Released lock")
90
91
# Run multiple workers
92
threads = []
93
for i in range(3):
94
t = threading.Thread(target=worker, args=(i,))
95
threads.append(t)
96
t.start()
97
98
for t in threads:
99
t.join()
100
```
101
102
### Lock with Context Manager
103
104
```python
105
import redis
106
import time
107
108
r = redis.Redis(host='localhost', port=6379, db=0)
109
110
def process_with_lock():
111
"""Use lock as context manager for automatic cleanup"""
112
with r.lock("critical_section", timeout=5) as lock:
113
print("Entered critical section")
114
115
# Simulate critical work
116
time.sleep(2)
117
118
print("Exiting critical section")
119
# Lock is automatically released here
120
121
# Call the function
122
process_with_lock()
123
```
124
125
### Blocking Lock with Timeout
126
127
```python
128
import redis
129
import time
130
import threading
131
132
r = redis.Redis(host='localhost', port=6379, db=0)
133
134
def blocking_worker(worker_id):
135
"""Worker that waits for lock with timeout"""
136
lock = r.lock("shared_counter", timeout=10)
137
138
try:
139
# Block for up to 5 seconds waiting for the lock
140
acquired = lock.acquire(blocking=True, blocking_timeout=5)
141
142
if acquired:
143
print(f"Worker {worker_id}: Got the lock after waiting")
144
145
# Increment counter
146
current = r.get("counter") or b"0"
147
new_value = int(current) + 1
148
r.set("counter", new_value)
149
150
print(f"Worker {worker_id}: Counter = {new_value}")
151
time.sleep(1) # Hold lock briefly
152
153
else:
154
print(f"Worker {worker_id}: Timeout waiting for lock")
155
156
finally:
157
if lock.owned():
158
lock.release()
159
160
# Initialize counter
161
r.set("counter", 0)
162
163
# Start multiple workers
164
threads = []
165
for i in range(5):
166
t = threading.Thread(target=blocking_worker, args=(i,))
167
threads.append(t)
168
t.start()
169
170
for t in threads:
171
t.join()
172
173
final_count = r.get("counter")
174
print(f"Final counter value: {final_count}")
175
```
176
177
### Lock Extension
178
179
```python
180
import redis
181
import time
182
import threading
183
184
r = redis.Redis(host='localhost', port=6379, db=0)
185
186
def long_running_task():
187
"""Task that may need to extend lock duration"""
188
lock = r.lock("long_task", timeout=5) # Initial 5 second timeout
189
190
try:
191
if lock.acquire():
192
print("Started long running task")
193
194
for i in range(10):
195
# Simulate work
196
time.sleep(1)
197
print(f"Working... step {i+1}/10")
198
199
# Extend lock if we're halfway through and need more time
200
if i == 4: # After 5 seconds of work
201
extended = lock.extend(additional_time=5)
202
if extended:
203
print("Extended lock by 5 seconds")
204
else:
205
print("Failed to extend lock - it may have expired")
206
break
207
208
print("Task completed")
209
finally:
210
if lock.owned():
211
lock.release()
212
213
long_running_task()
214
```
215
216
### Lock Reacquisition
217
218
```python
219
import redis
220
import time
221
222
r = redis.Redis(host='localhost', port=6379, db=0)
223
224
def task_with_reacquisition():
225
"""Task that releases and reacquires lock"""
226
lock = r.lock("reacquire_demo", timeout=10)
227
228
try:
229
# Initial acquisition
230
if lock.acquire():
231
print("Phase 1: Acquired lock")
232
time.sleep(2)
233
234
# Release lock temporarily
235
lock.release()
236
print("Released lock for other processes")
237
238
# Do some work that doesn't need the lock
239
time.sleep(1)
240
241
# Reacquire the same lock
242
if lock.reacquire():
243
print("Phase 2: Reacquired lock")
244
time.sleep(2)
245
print("Phase 2 complete")
246
else:
247
print("Failed to reacquire lock")
248
249
finally:
250
if lock.owned():
251
lock.release()
252
253
task_with_reacquisition()
254
```
255
256
### Lock Status Checking
257
258
```python
259
import redis
260
import time
261
import threading
262
263
r = redis.Redis(host='localhost', port=6379, db=0)
264
265
def monitor_lock():
266
"""Monitor lock status"""
267
lock = r.lock("monitored_lock", timeout=8)
268
269
def lock_holder():
270
"""Function that holds the lock"""
271
with lock:
272
print("Lock holder: Acquired lock")
273
time.sleep(5)
274
print("Lock holder: Releasing lock")
275
276
def lock_monitor():
277
"""Function that monitors the lock"""
278
time.sleep(0.5) # Let lock holder start first
279
280
for i in range(10):
281
is_locked = lock.locked()
282
is_owned = lock.owned()
283
284
print(f"Monitor: Locked={is_locked}, Owned by us={is_owned}")
285
time.sleep(1)
286
287
# Start both threads
288
holder_thread = threading.Thread(target=lock_holder)
289
monitor_thread = threading.Thread(target=lock_monitor)
290
291
holder_thread.start()
292
monitor_thread.start()
293
294
holder_thread.join()
295
monitor_thread.join()
296
297
monitor_lock()
298
```
299
300
### Distributed Counter with Lock
301
302
```python
303
import redis
304
import time
305
import threading
306
import random
307
308
class DistributedCounter:
309
def __init__(self, redis_client, counter_name):
310
self.r = redis_client
311
self.counter_name = counter_name
312
self.lock_name = f"{counter_name}:lock"
313
314
def increment(self, amount=1):
315
"""Thread-safe increment operation"""
316
with self.r.lock(self.lock_name, timeout=5) as lock:
317
# Get current value
318
current = self.r.get(self.counter_name)
319
current_value = int(current) if current else 0
320
321
# Simulate some processing time
322
time.sleep(random.uniform(0.01, 0.1))
323
324
# Set new value
325
new_value = current_value + amount
326
self.r.set(self.counter_name, new_value)
327
328
return new_value
329
330
def get_value(self):
331
"""Get current counter value"""
332
value = self.r.get(self.counter_name)
333
return int(value) if value else 0
334
335
# Usage example
336
r = redis.Redis(host='localhost', port=6379, db=0)
337
counter = DistributedCounter(r, "global_counter")
338
339
# Initialize counter
340
r.set("global_counter", 0)
341
342
def worker(worker_id, counter, iterations):
343
"""Worker that increments counter multiple times"""
344
for i in range(iterations):
345
value = counter.increment()
346
print(f"Worker {worker_id}: Incremented to {value}")
347
time.sleep(random.uniform(0.1, 0.3))
348
349
# Run multiple workers concurrently
350
threads = []
351
for i in range(5):
352
t = threading.Thread(target=worker, args=(i, counter, 3))
353
threads.append(t)
354
t.start()
355
356
for t in threads:
357
t.join()
358
359
print(f"Final counter value: {counter.get_value()}")
360
```
361
362
### Lock with Custom Token
363
364
```python
365
import redis
366
import uuid
367
import time
368
369
r = redis.Redis(host='localhost', port=6379, db=0)
370
371
def custom_token_lock():
372
"""Use custom token for lock identification"""
373
# Generate custom token
374
custom_token = str(uuid.uuid4()).encode()
375
376
lock = r.lock("custom_token_lock", timeout=10)
377
378
try:
379
# Acquire with custom token
380
acquired = lock.acquire(token=custom_token)
381
382
if acquired:
383
print(f"Acquired lock with token: {custom_token}")
384
print(f"Lock token: {lock.token}")
385
386
# Verify we own the lock
387
print(f"Lock owned: {lock.owned()}")
388
389
time.sleep(2)
390
391
finally:
392
if lock.owned():
393
lock.release()
394
print("Lock released")
395
396
custom_token_lock()
397
```
398
399
### Handling Lock Expiration
400
401
```python
402
import redis
403
import time
404
import threading
405
406
r = redis.Redis(host='localhost', port=6379, db=0)
407
408
def task_with_expiration_handling():
409
"""Handle case where lock expires during processing"""
410
lock = r.lock("expiring_lock", timeout=3) # Short timeout for demo
411
412
try:
413
if lock.acquire():
414
print("Acquired lock")
415
416
# Simulate long-running task
417
for i in range(8):
418
time.sleep(1)
419
420
# Check if we still own the lock
421
if not lock.owned():
422
print(f"Lock expired during processing at step {i+1}")
423
break
424
425
print(f"Processing step {i+1}")
426
else:
427
print("Task completed successfully")
428
429
except Exception as e:
430
print(f"Error during processing: {e}")
431
finally:
432
# Only release if we still own it
433
if lock.owned():
434
lock.release()
435
print("Released lock")
436
else:
437
print("Lock was already expired/released")
438
439
task_with_expiration_handling()
440
```
441
442
### Multi-Resource Locking
443
444
```python
445
import redis
446
import time
447
448
r = redis.Redis(host='localhost', port=6379, db=0)
449
450
class MultiLock:
451
def __init__(self, redis_client, lock_names, timeout=10):
452
self.r = redis_client
453
self.locks = [r.lock(name, timeout=timeout) for name in lock_names]
454
self.acquired_locks = []
455
456
def acquire_all(self, blocking=True, blocking_timeout=None):
457
"""Acquire all locks or none"""
458
try:
459
for lock in self.locks:
460
if lock.acquire(blocking=blocking, blocking_timeout=blocking_timeout):
461
self.acquired_locks.append(lock)
462
else:
463
# Failed to acquire a lock, release all acquired ones
464
self.release_all()
465
return False
466
return True
467
except Exception:
468
self.release_all()
469
raise
470
471
def release_all(self):
472
"""Release all acquired locks"""
473
for lock in self.acquired_locks:
474
if lock.owned():
475
lock.release()
476
self.acquired_locks.clear()
477
478
def __enter__(self):
479
if self.acquire_all():
480
return self
481
else:
482
raise RuntimeError("Could not acquire all locks")
483
484
def __exit__(self, exc_type, exc_val, exc_tb):
485
self.release_all()
486
487
def transfer_between_accounts(from_account, to_account, amount):
488
"""Transfer money between accounts with multi-lock"""
489
# Lock both accounts to prevent race conditions
490
lock_names = [f"account:{from_account}:lock", f"account:{to_account}:lock"]
491
492
with MultiLock(r, lock_names) as multi_lock:
493
print(f"Acquired locks for accounts {from_account} and {to_account}")
494
495
# Get balances
496
from_balance = float(r.get(f"account:{from_account}:balance") or 0)
497
to_balance = float(r.get(f"account:{to_account}:balance") or 0)
498
499
# Check sufficient funds
500
if from_balance < amount:
501
raise ValueError("Insufficient funds")
502
503
# Perform transfer
504
time.sleep(0.1) # Simulate processing time
505
506
r.set(f"account:{from_account}:balance", from_balance - amount)
507
r.set(f"account:{to_account}:balance", to_balance + amount)
508
509
print(f"Transferred ${amount} from account {from_account} to {to_account}")
510
511
# Initialize accounts
512
r.set("account:1001:balance", 1000)
513
r.set("account:1002:balance", 500)
514
515
# Perform transfer
516
try:
517
transfer_between_accounts("1001", "1002", 250)
518
519
# Check final balances
520
balance_1001 = r.get("account:1001:balance")
521
balance_1002 = r.get("account:1002:balance")
522
print(f"Final balances: Account 1001: ${balance_1001}, Account 1002: ${balance_1002}")
523
524
except Exception as e:
525
print(f"Transfer failed: {e}")
526
```