0
# Task Lifecycle and Hooks
1
2
Task lifecycle management including pre/post execution hooks, startup/shutdown hooks, signal handling, and task pipeline chaining. These features enable comprehensive task orchestration and monitoring.
3
4
## Capabilities
5
6
### Execution Hooks
7
8
Register callbacks that run before and after task execution for monitoring, logging, and custom processing.
9
10
```python { .api }
11
def pre_execute(self, name=None):
12
"""
13
Decorator to register pre-execution hook.
14
15
Parameters:
16
- name (str): Hook name (default: function name)
17
18
Returns:
19
Decorator function
20
21
Hook function signature:
22
def hook(task): ...
23
24
Hooks can raise CancelExecution to prevent task execution.
25
"""
26
27
def post_execute(self, name=None):
28
"""
29
Decorator to register post-execution hook.
30
31
Parameters:
32
- name (str): Hook name (default: function name)
33
34
Returns:
35
Decorator function
36
37
Hook function signature:
38
def hook(task, task_value, exception): ...
39
40
Parameters:
41
- task: Task instance that executed
42
- task_value: Return value from task (None if exception occurred)
43
- exception: Exception instance if task failed (None if successful)
44
"""
45
46
def unregister_pre_execute(self, name):
47
"""
48
Remove a pre-execution hook.
49
50
Parameters:
51
- name (str or function): Hook name or function to remove
52
53
Returns:
54
bool: True if hook was removed
55
"""
56
57
def unregister_post_execute(self, name):
58
"""
59
Remove a post-execution hook.
60
61
Parameters:
62
- name (str or function): Hook name or function to remove
63
64
Returns:
65
bool: True if hook was removed
66
"""
67
```
68
69
### Startup and Shutdown Hooks
70
71
Register callbacks for consumer process lifecycle events.
72
73
```python { .api }
74
def on_startup(self, name=None):
75
"""
76
Decorator to register startup hook.
77
78
Parameters:
79
- name (str): Hook name (default: function name)
80
81
Returns:
82
Decorator function
83
84
Hook function signature:
85
def hook(): ...
86
"""
87
88
def on_shutdown(self, name=None):
89
"""
90
Decorator to register shutdown hook.
91
92
Parameters:
93
- name (str): Hook name (default: function name)
94
95
Returns:
96
Decorator function
97
98
Hook function signature:
99
def hook(): ...
100
"""
101
102
def unregister_on_startup(self, name):
103
"""
104
Remove a startup hook.
105
106
Parameters:
107
- name (str or function): Hook name or function to remove
108
109
Returns:
110
bool: True if hook was removed
111
"""
112
113
def unregister_on_shutdown(self, name):
114
"""
115
Remove a shutdown hook.
116
117
Parameters:
118
- name (str or function): Hook name or function to remove
119
120
Returns:
121
bool: True if hook was removed
122
"""
123
```
124
125
### Signal Handling
126
127
Register signal handlers for various task execution events.
128
129
```python { .api }
130
def signal(self, *signals):
131
"""
132
Decorator to register signal handler.
133
134
Parameters:
135
- *signals: Signal names to handle
136
137
Available signals:
138
- SIGNAL_ENQUEUED: Task was added to queue
139
- SIGNAL_EXECUTING: Task execution started
140
- SIGNAL_COMPLETE: Task completed successfully
141
- SIGNAL_ERROR: Task failed with exception
142
- SIGNAL_RETRYING: Task is being retried
143
- SIGNAL_REVOKED: Task was revoked
144
- SIGNAL_CANCELED: Task was canceled
145
- SIGNAL_SCHEDULED: Task was added to schedule
146
- SIGNAL_LOCKED: Task could not acquire lock
147
- SIGNAL_EXPIRED: Task expired before execution
148
- SIGNAL_INTERRUPTED: Task was interrupted
149
150
Returns:
151
Decorator function
152
153
Handler function signature:
154
def handler(signal, task, *args, **kwargs): ...
155
"""
156
157
def disconnect_signal(self, receiver, *signals):
158
"""
159
Disconnect a signal handler.
160
161
Parameters:
162
- receiver: Handler function to disconnect
163
- *signals: Signal names to disconnect from
164
165
Returns:
166
None
167
"""
168
```
169
170
### Task Pipeline and Chaining
171
172
Create task chains and pipelines for complex workflows.
173
174
```python { .api }
175
def then(self, task, *args, **kwargs):
176
"""
177
Chain another task to run after this one completes successfully.
178
179
Parameters:
180
- task (TaskWrapper or Task): Task to run next
181
- *args: Arguments to pass to next task
182
- **kwargs: Keyword arguments to pass to next task
183
184
Returns:
185
Task: Self for method chaining
186
"""
187
188
def error(self, task, *args, **kwargs):
189
"""
190
Chain another task to run if this one fails.
191
192
Parameters:
193
- task (TaskWrapper or Task): Task to run on error
194
- *args: Arguments to pass to error task
195
- **kwargs: Keyword arguments to pass to error task
196
197
Returns:
198
Task: Self for method chaining
199
"""
200
```
201
202
### Task Revocation and Control
203
204
Control task execution with revocation and restoration capabilities.
205
206
```python { .api }
207
def revoke_all(self, task_class, revoke_until=None, revoke_once=False):
208
"""
209
Revoke all instances of a task type.
210
211
Parameters:
212
- task_class: Task class to revoke
213
- revoke_until (datetime): Revoke until specific time (optional)
214
- revoke_once (bool): Revoke only next execution (default: False)
215
216
Returns:
217
None
218
"""
219
220
def restore_all(self, task_class):
221
"""
222
Restore all instances of a revoked task type.
223
224
Parameters:
225
- task_class: Task class to restore
226
227
Returns:
228
bool: True if tasks were revoked and restored
229
"""
230
231
def revoke_by_id(self, id, revoke_until=None, revoke_once=False):
232
"""
233
Revoke specific task by ID.
234
235
Parameters:
236
- id (str): Task ID to revoke
237
- revoke_until (datetime): Revoke until specific time (optional)
238
- revoke_once (bool): Revoke only this execution (default: False)
239
240
Returns:
241
None
242
"""
243
244
def restore_by_id(self, id):
245
"""
246
Restore specific task by ID.
247
248
Parameters:
249
- id (str): Task ID to restore
250
251
Returns:
252
bool: True if task was revoked and restored
253
"""
254
255
def is_revoked(self, task, timestamp=None, peek=True):
256
"""
257
Check if task or task type is revoked.
258
259
Parameters:
260
- task: Task instance, task class, or task ID
261
- timestamp (datetime): Check time (default: now)
262
- peek (bool): Don't consume revocation data (default: True)
263
264
Returns:
265
bool: True if revoked
266
"""
267
```
268
269
## Usage Examples
270
271
### Pre and Post Execution Hooks
272
273
```python
274
from huey import RedisHuey
275
import logging
276
import time
277
278
huey = RedisHuey('lifecycle-app')
279
280
# Set up logging
281
logging.basicConfig(level=logging.INFO)
282
logger = logging.getLogger('task-hooks')
283
284
@huey.pre_execute()
285
def log_task_start(task):
286
logger.info(f"Starting task: {task.name} (ID: {task.id})")
287
# Could add authentication, resource checks, etc.
288
289
@huey.post_execute()
290
def log_task_complete(task, task_value, exception):
291
if exception:
292
logger.error(f"Task {task.name} failed: {exception}")
293
else:
294
logger.info(f"Task {task.name} completed: {task_value}")
295
296
@huey.task()
297
def process_order(order_id):
298
time.sleep(2) # Simulate processing
299
return f"Order {order_id} processed"
300
301
# Task execution will trigger hooks
302
result = process_order(12345)
303
```
304
305
### Startup and Shutdown Hooks
306
307
```python
308
import redis
309
310
@huey.on_startup()
311
def initialize_connections():
312
logger.info("Consumer starting up - initializing connections")
313
# Initialize database connections, cache, etc.
314
global redis_client
315
redis_client = redis.Redis(host='localhost', port=6379, db=0)
316
317
@huey.on_shutdown()
318
def cleanup_resources():
319
logger.info("Consumer shutting down - cleaning up resources")
320
# Close connections, save state, etc.
321
if 'redis_client' in globals():
322
redis_client.close()
323
324
@huey.task()
325
def cache_data(key, value):
326
redis_client.set(key, value)
327
return f"Cached {key}"
328
```
329
330
### Signal Handling
331
332
```python
333
from huey import signals as S
334
335
@huey.signal(S.SIGNAL_ENQUEUED)
336
def task_enqueued(signal, task):
337
logger.info(f"Task enqueued: {task.name}")
338
339
@huey.signal(S.SIGNAL_ERROR)
340
def task_error(signal, task, exception):
341
logger.error(f"Task {task.name} failed: {exception}")
342
# Could send alerts, update metrics, etc.
343
344
@huey.signal(S.SIGNAL_RETRYING)
345
def task_retrying(signal, task):
346
logger.warning(f"Retrying task: {task.name} ({task.retries} retries left)")
347
348
@huey.signal(S.SIGNAL_COMPLETE)
349
def task_complete(signal, task):
350
logger.info(f"Task completed: {task.name}")
351
# Could update progress tracking, send notifications, etc.
352
```
353
354
### Task Chaining and Pipelines
355
356
```python
357
@huey.task()
358
def download_file(url):
359
# Download file logic
360
return f"downloaded_{url.split('/')[-1]}"
361
362
@huey.task()
363
def process_file(filename):
364
# Process file logic
365
return f"processed_{filename}"
366
367
@huey.task()
368
def cleanup_file(filename):
369
# Cleanup logic
370
return f"cleaned_{filename}"
371
372
@huey.task()
373
def send_notification(message):
374
# Send notification
375
return f"notified: {message}"
376
377
# Create task pipeline
378
task = download_file.s("http://example.com/data.csv")
379
task = task.then(process_file)
380
task = task.then(cleanup_file)
381
task = task.then(send_notification, "Processing complete")
382
task = task.error(send_notification, "Processing failed")
383
384
# Enqueue the pipeline
385
result = huey.enqueue(task)
386
```
387
388
### Task Revocation and Control
389
390
```python
391
@huey.task()
392
def long_running_task(data):
393
# Simulate long-running task
394
time.sleep(60)
395
return f"Processed {data}"
396
397
# Start some tasks
398
results = []
399
for i in range(5):
400
result = long_running_task(f"data_{i}")
401
results.append(result)
402
403
# Revoke specific task
404
results[0].revoke()
405
406
# Revoke all instances of a task type
407
huey.revoke_all(long_running_task.task_class, revoke_once=True)
408
409
# Check if task is revoked
410
if results[1].is_revoked():
411
print("Task was revoked")
412
results[1].restore() # Restore if needed
413
414
# Revoke task by ID
415
task_id = results[2].id
416
huey.revoke_by_id(task_id, revoke_once=True)
417
```
418
419
### Advanced Hook Patterns
420
421
```python
422
# Context-aware hooks
423
current_user = None
424
425
@huey.pre_execute()
426
def set_task_context(task):
427
global current_user
428
# Extract user context from task data
429
if hasattr(task, 'kwargs') and 'user_id' in task.kwargs:
430
current_user = get_user(task.kwargs['user_id'])
431
432
@huey.post_execute()
433
def clear_task_context(task, task_value, exception):
434
global current_user
435
current_user = None
436
437
# Performance monitoring hook
438
task_times = {}
439
440
@huey.pre_execute('performance_monitor')
441
def start_timer(task):
442
task_times[task.id] = time.time()
443
444
@huey.post_execute('performance_monitor')
445
def end_timer(task, task_value, exception):
446
if task.id in task_times:
447
duration = time.time() - task_times[task.id]
448
logger.info(f"Task {task.name} took {duration:.2f} seconds")
449
del task_times[task.id]
450
451
# Conditional execution hook
452
@huey.pre_execute()
453
def check_maintenance_mode(task):
454
if is_maintenance_mode() and not task.name.startswith('maintenance_'):
455
raise CancelExecution("System in maintenance mode")
456
```