0
# Task Locking and Concurrency
1
2
Task locking mechanisms, concurrency control, and synchronization features to prevent duplicate task execution and manage shared resources. These features ensure proper coordination in multi-worker environments.
3
4
## Capabilities
5
6
### Task Locking
7
8
Prevent multiple workers from executing the same critical section simultaneously.
9
10
```python { .api }
11
def lock_task(self, lock_name):
12
"""
13
Create a task lock for coordinating access to shared resources.
14
15
Parameters:
16
- lock_name (str): Name of the lock
17
18
Returns:
19
TaskLock: Lock instance that can be used as context manager or decorator
20
"""
21
22
def is_locked(self, lock_name):
23
"""
24
Check if a named lock is currently held.
25
26
Parameters:
27
- lock_name (str): Name of the lock to check
28
29
Returns:
30
bool: True if lock is currently held
31
"""
32
33
def flush_locks(self, *names):
34
"""
35
Remove specified locks or all locks if no names given.
36
37
Parameters:
38
- *names: Lock names to remove (optional, removes all if empty)
39
40
Returns:
41
set: Names of locks that were removed
42
"""
43
```
44
45
### TaskLock Class
46
47
Context manager and decorator for task synchronization.
48
49
```python { .api }
50
class TaskLock:
51
def __init__(self, huey, name):
52
"""
53
Initialize task lock.
54
55
Parameters:
56
- huey: Huey instance
57
- name (str): Lock name
58
"""
59
60
def is_locked(self):
61
"""
62
Check if this lock is currently held.
63
64
Returns:
65
bool: True if lock is held
66
"""
67
68
def clear(self):
69
"""
70
Force release this lock.
71
72
Returns:
73
bool: True if lock was held and released
74
"""
75
76
def __call__(self, fn):
77
"""
78
Use lock as a decorator.
79
80
Parameters:
81
- fn: Function to wrap with lock
82
83
Returns:
84
Wrapped function that acquires lock before execution
85
"""
86
87
def __enter__(self):
88
"""
89
Acquire lock (context manager entry).
90
91
Raises:
92
TaskLockedException: If lock cannot be acquired
93
"""
94
95
def __exit__(self, exc_type, exc_val, exc_tb):
96
"""
97
Release lock (context manager exit).
98
"""
99
```
100
101
### Lock Exception Handling
102
103
Exception raised when lock cannot be acquired.
104
105
```python { .api }
106
class TaskLockedException(HueyException):
107
"""
108
Exception raised when a task cannot acquire a required lock.
109
110
This exception is raised when:
111
- A task decorated with @lock cannot acquire the lock
112
- A context manager lock cannot be acquired
113
- A task tries to acquire a lock that's already held
114
"""
115
```
116
117
## Usage Examples
118
119
### Basic Task Locking
120
121
```python
122
from huey import RedisHuey
123
from huey.exceptions import TaskLockedException
124
125
huey = RedisHuey('locking-app')
126
127
@huey.task()
128
def update_user_count():
129
# Use lock as context manager
130
with huey.lock_task('user_count_update'):
131
# Only one worker can execute this block at a time
132
current_count = get_user_count()
133
new_count = recalculate_user_count()
134
update_user_count_in_db(new_count)
135
return new_count
136
137
# Multiple workers can enqueue this task, but only one executes at a time
138
result1 = update_user_count()
139
result2 = update_user_count() # Will wait for first to complete
140
```
141
142
### Lock as Decorator
143
144
```python
145
# Create a reusable lock
146
user_stats_lock = huey.lock_task('user_stats')
147
148
@huey.task()
149
@user_stats_lock
150
def update_user_stats(user_id):
151
# This entire function is protected by the lock
152
stats = calculate_user_stats(user_id)
153
save_user_stats(user_id, stats)
154
return stats
155
156
# Alternative: inline decorator
157
@huey.task()
158
@huey.lock_task('report_generation')
159
def generate_daily_report():
160
# Generate report logic
161
return "Report generated"
162
```
163
164
### Fine-grained Locking
165
166
```python
167
@huey.task()
168
def process_user_data(user_id):
169
# Use user-specific locks
170
lock_name = f'user_{user_id}_processing'
171
172
try:
173
with huey.lock_task(lock_name):
174
# Process user data
175
data = load_user_data(user_id)
176
processed = process_data(data)
177
save_processed_data(user_id, processed)
178
return f"Processed user {user_id}"
179
except TaskLockedException:
180
# Another worker is already processing this user
181
return f"User {user_id} already being processed"
182
183
# Each user can be processed independently
184
results = []
185
for user_id in [1, 2, 3, 1, 2]: # Note: duplicates
186
result = process_user_data(user_id)
187
results.append(result)
188
```
189
190
### Lock Status Monitoring
191
192
```python
193
@huey.task()
194
def monitor_locks():
195
# Check specific locks
196
critical_locks = ['database_backup', 'user_count_update', 'report_generation']
197
198
lock_status = {}
199
for lock_name in critical_locks:
200
is_locked = huey.is_locked(lock_name)
201
lock_status[lock_name] = is_locked
202
203
return lock_status
204
205
@huey.task()
206
def emergency_unlock():
207
# Force release all locks (use with caution!)
208
released = huey.flush_locks()
209
return f"Released locks: {released}"
210
211
@huey.task()
212
def unlock_specific_locks():
213
# Release specific locks
214
released = huey.flush_locks('stale_lock_1', 'stale_lock_2')
215
return f"Released locks: {released}"
216
```
217
218
### Conditional Locking
219
220
```python
221
@huey.task()
222
def conditional_processing(resource_id):
223
lock_name = f'resource_{resource_id}'
224
225
# Check if already locked before attempting
226
if huey.is_locked(lock_name):
227
return f"Resource {resource_id} is busy, skipping"
228
229
try:
230
with huey.lock_task(lock_name):
231
# Process resource
232
result = process_resource(resource_id)
233
return result
234
except TaskLockedException:
235
# Lock was acquired between check and acquisition
236
return f"Resource {resource_id} became busy"
237
```
238
239
### Lock with Timeout Pattern
240
241
```python
242
import time
243
from contextlib import contextmanager
244
245
@contextmanager
246
def timed_lock(huey_instance, lock_name, timeout=30, check_interval=0.5):
247
"""Custom lock with timeout capability."""
248
start_time = time.time()
249
250
while time.time() - start_time < timeout:
251
try:
252
with huey_instance.lock_task(lock_name):
253
yield
254
return
255
except TaskLockedException:
256
time.sleep(check_interval)
257
258
raise TimeoutError(f"Could not acquire lock '{lock_name}' within {timeout} seconds")
259
260
@huey.task()
261
def process_with_timeout(data):
262
try:
263
with timed_lock(huey, 'critical_resource', timeout=60):
264
# Process data with timeout
265
result = expensive_processing(data)
266
return result
267
except TimeoutError as e:
268
return f"Processing failed: {e}"
269
```
270
271
### Database Connection Pooling with Locks
272
273
```python
274
import sqlite3
275
import threading
276
277
# Shared resource that needs protection
278
db_connections = {}
279
connection_lock = threading.Lock()
280
281
@huey.task()
282
def database_task(query, db_name='default'):
283
# Use lock to coordinate database access
284
lock_name = f'db_access_{db_name}'
285
286
with huey.lock_task(lock_name):
287
# Get or create database connection
288
if db_name not in db_connections:
289
db_connections[db_name] = sqlite3.connect(f'{db_name}.db')
290
291
conn = db_connections[db_name]
292
cursor = conn.cursor()
293
cursor.execute(query)
294
result = cursor.fetchall()
295
conn.commit()
296
297
return f"Query executed: {len(result)} rows"
298
```
299
300
### Distributed Lock Patterns
301
302
```python
303
@huey.task()
304
def singleton_task():
305
"""Ensure only one instance of this task runs across all workers."""
306
lock_name = 'singleton_task_global'
307
308
try:
309
with huey.lock_task(lock_name):
310
# This code runs on only one worker globally
311
perform_singleton_operation()
312
return "Singleton task completed"
313
except TaskLockedException:
314
return "Singleton task already running"
315
316
@huey.task()
317
def batch_processor(batch_id):
318
"""Process batches with coordination between workers."""
319
# Lock the entire batch
320
batch_lock = f'batch_{batch_id}'
321
322
try:
323
with huey.lock_task(batch_lock):
324
items = get_batch_items(batch_id)
325
326
# Process items with item-level locks for fine-grained control
327
results = []
328
for item_id in items:
329
item_lock = f'item_{item_id}'
330
try:
331
with huey.lock_task(item_lock):
332
result = process_item(item_id)
333
results.append(result)
334
except TaskLockedException:
335
results.append(f"Item {item_id} locked")
336
337
return f"Batch {batch_id}: {len(results)} items processed"
338
except TaskLockedException:
339
return f"Batch {batch_id} already being processed"
340
```
341
342
### Lock Cleanup and Maintenance
343
344
```python
345
@huey.periodic_task(crontab(minute='*/10')) # Every 10 minutes
346
def cleanup_stale_locks():
347
"""Periodic cleanup of potentially stale locks."""
348
# In production, you might want to track lock timestamps
349
# and clean up locks that are older than expected task duration
350
351
# For now, just report on current locks
352
# (Manual cleanup would require custom lock tracking)
353
354
# Check critical locks
355
critical_locks = ['database_backup', 'report_generation']
356
stale_locks = []
357
358
for lock_name in critical_locks:
359
if huey.is_locked(lock_name):
360
# In real implementation, check if lock is truly stale
361
# based on timestamps or other criteria
362
stale_locks.append(lock_name)
363
364
if stale_locks:
365
return f"Warning: Long-running locks detected: {stale_locks}"
366
else:
367
return "All locks appear healthy"
368
369
@huey.task()
370
def force_unlock_emergency(lock_names):
371
"""Emergency lock release (use with extreme caution)."""
372
if not isinstance(lock_names, list):
373
lock_names = [lock_names]
374
375
released = huey.flush_locks(*lock_names)
376
return f"Emergency unlock completed. Released: {released}"
377
```