0
# Task Management
1
2
Complete task lifecycle management including execution, monitoring, filtering, search, and control operations for Celery distributed tasks.
3
4
## Capabilities
5
6
### Task Filtering and Iteration
7
8
Advanced task filtering and iteration capabilities for processing large numbers of tasks with various criteria.
9
10
```python { .api }
11
def iter_tasks(events, limit=None, offset=0, type=None, worker=None,
12
state=None, sort_by=None, received_start=None, received_end=None,
13
started_start=None, started_end=None, search=None):
14
"""
15
Iterator for filtered tasks with pagination and sorting.
16
17
Args:
18
events: Events instance containing task data
19
limit (int, optional): Maximum number of tasks to return
20
offset (int, optional): Number of tasks to skip (pagination)
21
type (str, optional): Filter by task name/type
22
worker (str, optional): Filter by worker hostname
23
state (str, optional): Filter by task state (PENDING, SUCCESS, FAILURE, etc.)
24
sort_by (str, optional): Sort field ('name', 'state', 'received', 'started', 'runtime')
25
received_start (datetime, optional): Filter tasks received after this time
26
received_end (datetime, optional): Filter tasks received before this time
27
started_start (datetime, optional): Filter tasks started after this time
28
started_end (datetime, optional): Filter tasks started before this time
29
search (str, optional): Search term for task content
30
31
Yields:
32
dict: Filtered task objects matching the specified criteria
33
34
Supports complex filtering combinations and efficient pagination
35
for large task datasets.
36
"""
37
38
def sort_tasks(tasks, sort_by):
39
"""
40
Sort tasks by specified field.
41
42
Args:
43
tasks (list): List of task objects to sort
44
sort_by (str): Sort field ('name', 'state', 'received', 'started', 'runtime')
45
46
Returns:
47
list: Sorted list of tasks
48
49
Supports sorting by multiple fields with proper handling of None values
50
and different data types.
51
"""
52
53
def get_task_by_id(events, task_id):
54
"""
55
Retrieve specific task by UUID.
56
57
Args:
58
events: Events instance containing task data
59
task_id (str): Task UUID to retrieve
60
61
Returns:
62
dict or None: Task object if found, None otherwise
63
"""
64
65
def as_dict(task):
66
"""
67
Convert task object to dictionary representation.
68
69
Args:
70
task: Task object from Celery state
71
72
Returns:
73
dict: Task data as dictionary with all relevant fields
74
"""
75
```
76
77
### Task Search and Filtering
78
79
Advanced search capabilities for finding tasks based on various criteria.
80
81
```python { .api }
82
def parse_search_terms(raw_search_value):
83
"""
84
Parse search query string into structured search terms.
85
86
Args:
87
raw_search_value (str): Raw search query string
88
89
Returns:
90
list: Parsed search terms with field specifications
91
92
Supports field-specific searches like 'name:my_task' or 'state:FAILURE'
93
and general text searches across task content.
94
"""
95
96
def satisfies_search_terms(task, search_terms):
97
"""
98
Check if task matches search criteria.
99
100
Args:
101
task (dict): Task object to check
102
search_terms (list): Parsed search terms from parse_search_terms
103
104
Returns:
105
bool: True if task matches all search terms
106
107
Performs comprehensive search across task name, arguments, result,
108
traceback, and other task metadata.
109
"""
110
111
def task_args_contains_search_args(task_args, search_args):
112
"""
113
Check if task arguments contain search terms.
114
115
Args:
116
task_args (list): Task arguments
117
search_args (list): Search terms to find
118
119
Returns:
120
bool: True if arguments contain search terms
121
122
Searches within task arguments and keyword arguments for specified terms.
123
"""
124
```
125
126
### Task State Constants
127
128
Standard Celery task states used throughout the system.
129
130
```python { .api }
131
# Task states from Celery
132
TASK_STATES = [
133
'PENDING', # Task is waiting for execution
134
'STARTED', # Task has been started
135
'SUCCESS', # Task completed successfully
136
'FAILURE', # Task failed with an exception
137
'RETRY', # Task is being retried
138
'REVOKED', # Task has been revoked/cancelled
139
]
140
141
# Sort key mappings for task sorting
142
sort_keys = {
143
'name': lambda task: task.name or '',
144
'state': lambda task: task.state or '',
145
'received': lambda task: task.received or 0,
146
'started': lambda task: task.started or 0,
147
'runtime': lambda task: task.runtime or 0,
148
'worker': lambda task: task.worker.hostname if task.worker else '',
149
}
150
```
151
152
## Task Control Operations
153
154
### Task Execution
155
156
Execute tasks remotely with various execution modes and options.
157
158
```python { .api }
159
def task_apply(task_name, args=None, kwargs=None, **options):
160
"""
161
Execute task synchronously and wait for result.
162
163
Args:
164
task_name (str): Name of task to execute
165
args (list, optional): Task arguments
166
kwargs (dict, optional): Task keyword arguments
167
**options: Additional task options (queue, countdown, eta, etc.)
168
169
Returns:
170
dict: Task result and metadata
171
172
Executes the task and waits for completion, returning the result
173
or raising an exception if the task fails.
174
"""
175
176
def task_async_apply(task_name, args=None, kwargs=None, **options):
177
"""
178
Execute task asynchronously without waiting for result.
179
180
Args:
181
task_name (str): Name of task to execute
182
args (list, optional): Task arguments
183
kwargs (dict, optional): Task keyword arguments
184
**options: Additional task options
185
186
Returns:
187
dict: Task ID and submission metadata
188
189
Submits the task for execution and returns immediately with task ID.
190
"""
191
192
def task_send_task(task_name, args=None, kwargs=None, **options):
193
"""
194
Send task without requiring task definition on sender.
195
196
Args:
197
task_name (str): Name of task to send
198
args (list, optional): Task arguments
199
kwargs (dict, optional): Task keyword arguments
200
**options: Additional task options
201
202
Returns:
203
dict: Task ID and submission metadata
204
205
Sends task using Celery's send_task, which doesn't require the
206
task to be registered locally.
207
"""
208
```
209
210
### Task Result Management
211
212
Retrieve and manage task results and execution status.
213
214
```python { .api }
215
def get_task_result(task_id, timeout=None):
216
"""
217
Get task result by ID.
218
219
Args:
220
task_id (str): Task UUID
221
timeout (float, optional): Maximum time to wait for result
222
223
Returns:
224
dict: Task result data including:
225
- result: Task return value
226
- state: Current task state
227
- traceback: Error traceback if failed
228
- success: Boolean success status
229
230
Retrieves result from the configured result backend.
231
"""
232
233
def task_abort(task_id):
234
"""
235
Abort running task.
236
237
Args:
238
task_id (str): Task UUID to abort
239
240
Returns:
241
dict: Abort operation status
242
243
Attempts to abort a running task if it supports abortion.
244
Only works with AbortableTask instances.
245
"""
246
```
247
248
### Task Control Commands
249
250
Remote control operations for managing task execution and behavior.
251
252
```python { .api }
253
def task_revoke(task_id, terminate=False, signal='SIGTERM'):
254
"""
255
Revoke/cancel a task.
256
257
Args:
258
task_id (str): Task UUID to revoke
259
terminate (bool): Whether to terminate if already running
260
signal (str): Signal to send if terminating ('SIGTERM', 'SIGKILL')
261
262
Returns:
263
dict: Revocation status
264
265
Revokes a task, optionally terminating it if already executing.
266
"""
267
268
def task_rate_limit(task_name, rate_limit, workername=None):
269
"""
270
Set rate limit for task type.
271
272
Args:
273
task_name (str): Name of task to limit
274
rate_limit (str): Rate limit specification (e.g., '10/m', '1/s')
275
workername (str, optional): Specific worker to apply limit
276
277
Returns:
278
dict: Rate limit operation status
279
280
Sets execution rate limit for the specified task type.
281
"""
282
283
def task_timeout(task_name, soft=None, hard=None, workername=None):
284
"""
285
Set timeout limits for task type.
286
287
Args:
288
task_name (str): Name of task to configure
289
soft (float, optional): Soft timeout in seconds
290
hard (float, optional): Hard timeout in seconds
291
workername (str, optional): Specific worker to apply timeouts
292
293
Returns:
294
dict: Timeout configuration status
295
296
Configures soft and hard timeout limits for task execution.
297
"""
298
```
299
300
## Task Data Structure
301
302
### Core Task Information
303
304
Comprehensive task data structure containing all relevant execution and metadata information.
305
306
```python { .api }
307
TaskInfo = {
308
# Basic identification
309
'uuid': str, # Unique task identifier
310
'name': str, # Task name/type
311
'state': str, # Current task state
312
'hostname': str, # Worker hostname executing task
313
314
# Timing information
315
'timestamp': float, # Task event timestamp
316
'received': float, # Time task was received by worker
317
'started': float, # Time task execution started
318
'succeeded': float, # Time task completed successfully
319
'failed': float, # Time task failed
320
'retried': float, # Time task was retried
321
'revoked': float, # Time task was revoked
322
'runtime': float, # Total execution time in seconds
323
324
# Task parameters
325
'args': list, # Task positional arguments
326
'kwargs': dict, # Task keyword arguments
327
'retries': int, # Number of retry attempts
328
'eta': str, # Estimated time of arrival
329
'expires': str, # Task expiration time
330
331
# Execution results
332
'result': Any, # Task return value (if successful)
333
'traceback': str, # Exception traceback (if failed)
334
'exception': str, # Exception message (if failed)
335
336
# Routing information
337
'queue': str, # Queue name task was sent to
338
'exchange': str, # Exchange name
339
'routing_key': str, # Routing key used
340
'priority': int, # Message priority
341
342
# Worker information
343
'worker': {
344
'hostname': str, # Worker hostname
345
'pid': int, # Worker process ID
346
'sw_ident': str, # Software identifier
347
'sw_ver': str, # Software version
348
'sw_sys': str, # System information
349
},
350
351
# Additional metadata
352
'clock': int, # Logical clock value
353
'client': str, # Client that sent the task
354
'root_id': str, # Root task ID (for task chains)
355
'parent_id': str, # Parent task ID (for task groups)
356
'children': [str], # Child task IDs
357
}
358
```
359
360
### Task Event Types
361
362
Different event types that can occur during task execution lifecycle.
363
364
```python { .api }
365
TaskEventTypes = {
366
'task-sent': {
367
'description': 'Task was sent to broker',
368
'fields': ['uuid', 'name', 'args', 'kwargs', 'eta', 'expires']
369
},
370
'task-received': {
371
'description': 'Worker received task from broker',
372
'fields': ['uuid', 'name', 'hostname', 'timestamp']
373
},
374
'task-started': {
375
'description': 'Worker started executing task',
376
'fields': ['uuid', 'hostname', 'timestamp', 'pid']
377
},
378
'task-succeeded': {
379
'description': 'Task completed successfully',
380
'fields': ['uuid', 'result', 'runtime', 'hostname', 'timestamp']
381
},
382
'task-failed': {
383
'description': 'Task execution failed',
384
'fields': ['uuid', 'exception', 'traceback', 'hostname', 'timestamp']
385
},
386
'task-retried': {
387
'description': 'Task is being retried',
388
'fields': ['uuid', 'reason', 'traceback', 'hostname', 'timestamp']
389
},
390
'task-revoked': {
391
'description': 'Task was revoked/cancelled',
392
'fields': ['uuid', 'hostname', 'timestamp']
393
}
394
}
395
```
396
397
## Usage Examples
398
399
### Basic Task Filtering
400
401
```python
402
from flower.utils.tasks import iter_tasks, get_task_by_id
403
from flower.events import Events
404
405
# Assume we have an events instance with task data
406
events = Events(celery_app, io_loop)
407
408
# Get all failed tasks
409
failed_tasks = list(iter_tasks(
410
events,
411
state='FAILURE',
412
limit=100
413
))
414
415
print(f"Found {len(failed_tasks)} failed tasks")
416
417
# Get tasks from specific worker
418
worker_tasks = list(iter_tasks(
419
events,
420
worker='celery@worker1',
421
limit=50
422
))
423
424
# Get recent tasks
425
from datetime import datetime, timedelta
426
recent_tasks = list(iter_tasks(
427
events,
428
received_start=datetime.now() - timedelta(hours=1),
429
sort_by='received'
430
))
431
```
432
433
### Advanced Task Search
434
435
```python
436
from flower.utils.search import parse_search_terms, satisfies_search_terms
437
438
# Parse search query
439
search_terms = parse_search_terms('name:my_task state:FAILURE')
440
441
# Find matching tasks
442
matching_tasks = []
443
for task in iter_tasks(events):
444
if satisfies_search_terms(task, search_terms):
445
matching_tasks.append(task)
446
447
# Search with text query
448
text_search = parse_search_terms('error database connection')
449
error_tasks = [
450
task for task in iter_tasks(events, state='FAILURE')
451
if satisfies_search_terms(task, text_search)
452
]
453
```
454
455
### Task Execution and Control
456
457
```python
458
from flower.api.tasks import TaskApply, TaskAsyncApply, TaskRevoke
459
460
# Execute task synchronously
461
async def execute_task():
462
handler = TaskApply()
463
result = await handler.post(
464
'my_task',
465
args=[1, 2, 3],
466
kwargs={'timeout': 30}
467
)
468
print(f"Task result: {result}")
469
470
# Execute task asynchronously
471
async def async_execute():
472
handler = TaskAsyncApply()
473
response = await handler.post(
474
'long_running_task',
475
args=['data'],
476
kwargs={'priority': 5}
477
)
478
task_id = response['task-id']
479
print(f"Task submitted: {task_id}")
480
481
# Revoke task
482
async def revoke_task():
483
handler = TaskRevoke()
484
await handler.post('task-uuid-here', terminate=True)
485
```
486
487
### Task Result Monitoring
488
489
```python
490
from flower.api.tasks import TaskResult, TaskInfo
491
492
# Get task result
493
async def get_result():
494
handler = TaskResult()
495
result = await handler.get('task-uuid-here', timeout=10)
496
497
if result['state'] == 'SUCCESS':
498
print(f"Task completed: {result['result']}")
499
elif result['state'] == 'FAILURE':
500
print(f"Task failed: {result['traceback']}")
501
502
# Get detailed task information
503
async def get_task_info():
504
handler = TaskInfo()
505
info = await handler.get('task-uuid-here')
506
507
print(f"Task: {info['name']}")
508
print(f"State: {info['state']}")
509
print(f"Worker: {info['hostname']}")
510
print(f"Runtime: {info.get('runtime', 'N/A')} seconds")
511
```
512
513
### Bulk Task Operations
514
515
```python
516
# Process tasks in batches
517
def process_tasks_batch(events, batch_size=1000):
518
offset = 0
519
520
while True:
521
batch = list(iter_tasks(
522
events,
523
limit=batch_size,
524
offset=offset,
525
sort_by='received'
526
))
527
528
if not batch:
529
break
530
531
# Process batch
532
for task in batch:
533
process_single_task(task)
534
535
offset += batch_size
536
537
# Find and revoke failed tasks
538
async def cleanup_failed_tasks():
539
failed_tasks = iter_tasks(events, state='FAILURE')
540
541
for task in failed_tasks:
542
if should_revoke_task(task):
543
await task_revoke(task['uuid'])
544
```
545
546
### Task Analytics
547
548
```python
549
from collections import Counter
550
from datetime import datetime, timedelta
551
552
def analyze_task_performance(events):
553
"""Generate task performance analytics."""
554
555
# Get tasks from last 24 hours
556
yesterday = datetime.now() - timedelta(days=1)
557
recent_tasks = list(iter_tasks(
558
events,
559
received_start=yesterday,
560
sort_by='received'
561
))
562
563
# Task counts by state
564
state_counts = Counter(task['state'] for task in recent_tasks)
565
566
# Task counts by name
567
name_counts = Counter(task['name'] for task in recent_tasks)
568
569
# Average runtime by task name
570
runtime_by_name = {}
571
for task_name in name_counts:
572
runtimes = [
573
task['runtime'] for task in recent_tasks
574
if task['name'] == task_name and task.get('runtime')
575
]
576
if runtimes:
577
runtime_by_name[task_name] = sum(runtimes) / len(runtimes)
578
579
return {
580
'total_tasks': len(recent_tasks),
581
'state_distribution': dict(state_counts),
582
'task_distribution': dict(name_counts),
583
'average_runtimes': runtime_by_name,
584
'success_rate': state_counts['SUCCESS'] / len(recent_tasks) if recent_tasks else 0
585
}
586
```
587
588
## Error Handling and Edge Cases
589
590
Task management includes comprehensive error handling for various scenarios:
591
592
```python
593
# Handle task execution errors
594
try:
595
result = await task_apply('my_task', args=[1, 2, 3])
596
except Exception as e:
597
if 'timeout' in str(e).lower():
598
print("Task execution timed out")
599
elif 'not registered' in str(e).lower():
600
print("Task not found on workers")
601
else:
602
print(f"Task execution failed: {e}")
603
604
# Handle missing task results
605
try:
606
result = await get_task_result('task-id')
607
except Exception as e:
608
if 'no such task' in str(e).lower():
609
print("Task not found in result backend")
610
else:
611
print(f"Could not retrieve result: {e}")
612
613
# Handle search and filtering edge cases
614
def safe_task_search(events, **filters):
615
try:
616
return list(iter_tasks(events, **filters))
617
except Exception as e:
618
print(f"Task search failed: {e}")
619
return []
620
```
621
622
## Performance Considerations
623
624
- Use pagination (`limit` and `offset`) for large task datasets
625
- Consider memory usage when processing many tasks
626
- Use specific filters to reduce dataset size before processing
627
- Cache frequent queries when possible
628
- Monitor search performance with complex queries
629
- Use appropriate sorting fields based on query patterns