0
# Tasks and Results
1
2
Task execution system that handles the lifecycle of tasks from creation to result retrieval. This includes decorated task wrappers, result containers, execution context, and utilities for working with task outcomes.
3
4
## Capabilities
5
6
### Task Decoration and Execution
7
8
Tasks are created by decorating functions with the broker's `@task` decorator, which converts them into distributed task objects that can be executed asynchronously.
9
10
```python { .api }
11
class AsyncTaskiqDecoratedTask:
12
"""
13
Decorated task wrapper that enables distributed execution.
14
15
Created automatically when using @broker.task decorator.
16
Provides methods for sending tasks to workers and calling locally.
17
"""
18
19
task_name: str
20
broker: AsyncBroker
21
labels: Dict[str, Any]
22
23
async def kiq(self, *args, **kwargs) -> TaskiqResult:
24
"""
25
Send task to broker for distributed execution.
26
27
Args:
28
*args: Positional arguments for the task function
29
**kwargs: Keyword arguments for the task function
30
31
Returns:
32
TaskiqResult object for retrieving the result
33
"""
34
35
async def __call__(self, *args, **kwargs) -> Any:
36
"""
37
Execute task locally in current process.
38
39
Args:
40
*args: Positional arguments for the task function
41
**kwargs: Keyword arguments for the task function
42
43
Returns:
44
Direct result of task function execution
45
"""
46
47
def kicker(self) -> AsyncKicker:
48
"""
49
Get kicker object for advanced task configuration.
50
51
Kicker allows modifying task parameters before sending.
52
53
Returns:
54
AsyncKicker instance for this task
55
"""
56
57
async def schedule_by_cron(
58
self,
59
source: ScheduleSource,
60
cron: Union[str, CronSpec],
61
*args,
62
**kwargs,
63
) -> CreatedSchedule:
64
"""
65
Schedule task to run on cron pattern.
66
67
Args:
68
source: Schedule source that supports dynamic scheduling
69
cron: Cron string or CronSpec instance
70
*args: Positional arguments for the task function
71
**kwargs: Keyword arguments for the task function
72
73
Returns:
74
CreatedSchedule object with schedule details
75
"""
76
77
async def schedule_by_time(
78
self,
79
source: ScheduleSource,
80
time: datetime,
81
*args,
82
**kwargs,
83
) -> CreatedSchedule:
84
"""
85
Schedule task to run at specific time.
86
87
Args:
88
source: Schedule source that supports dynamic scheduling
89
time: Specific datetime to run the task
90
*args: Positional arguments for the task function
91
**kwargs: Keyword arguments for the task function
92
93
Returns:
94
CreatedSchedule object with schedule details
95
"""
96
97
class AsyncTaskiqTask:
98
"""
99
Task execution wrapper for handling async task invocation.
100
"""
101
102
def __init__(
103
self,
104
task_name: str,
105
broker: AsyncBroker,
106
labels: Optional[Dict[str, Any]] = None,
107
) -> None: ...
108
109
async def kiq(self, *args, **kwargs) -> TaskiqResult:
110
"""Send task for execution and return result handle."""
111
112
async def is_ready(self) -> bool:
113
"""
114
Check if task result is ready.
115
116
Returns:
117
True if task is completed, False otherwise
118
119
Raises:
120
ResultIsReadyError: If unable to check task readiness
121
"""
122
123
async def get_result(self, with_logs: bool = False) -> TaskiqResult:
124
"""
125
Get task result from result backend.
126
127
Args:
128
with_logs: Whether to fetch execution logs
129
130
Returns:
131
TaskiqResult with execution outcome
132
133
Raises:
134
ResultGetError: If unable to retrieve result
135
"""
136
137
async def wait_result(
138
self,
139
check_interval: float = 0.2,
140
timeout: float = -1.0,
141
with_logs: bool = False,
142
) -> TaskiqResult:
143
"""
144
Wait for task completion and return result.
145
146
Args:
147
check_interval: How often to check for completion (seconds)
148
timeout: Maximum time to wait (-1 for no timeout)
149
with_logs: Whether to fetch execution logs
150
151
Returns:
152
TaskiqResult with execution outcome
153
154
Raises:
155
TaskiqResultTimeoutError: If timeout is exceeded
156
"""
157
158
async def get_progress(self) -> Optional[TaskProgress[Any]]:
159
"""
160
Get current task execution progress.
161
162
Returns:
163
TaskProgress object or None if no progress tracking
164
"""
165
```
166
167
### Result Management
168
169
Result objects provide access to task execution outcomes, including return values, errors, execution metadata, and status checking.
170
171
```python { .api }
172
class TaskiqResult:
173
"""
174
Container for task execution results and metadata.
175
176
Supports both successful results and error conditions,
177
along with execution timing and custom labels.
178
"""
179
180
is_err: bool
181
"""Whether the task execution resulted in an error."""
182
183
return_value: Any
184
"""The return value from successful task execution."""
185
186
execution_time: float
187
"""Task execution time in seconds."""
188
189
labels: Dict[str, Any]
190
"""Custom labels attached to the task result."""
191
192
error: Optional[BaseException]
193
"""Exception object if task execution failed."""
194
195
log: Optional[str]
196
"""Deprecated: Task execution logs (may be removed in future)."""
197
198
async def wait_result(
199
self,
200
timeout: Optional[float] = None,
201
check_interval: float = 0.5,
202
) -> Any:
203
"""
204
Wait for task completion and return the result.
205
206
Args:
207
timeout: Maximum time to wait in seconds
208
check_interval: How often to check for completion
209
210
Returns:
211
The task return value
212
213
Raises:
214
TaskiqResultTimeoutError: If timeout is exceeded
215
Exception: Any exception raised by the task
216
"""
217
218
async def is_ready(self) -> bool:
219
"""
220
Check if task result is available.
221
222
Returns:
223
True if result is ready, False otherwise
224
"""
225
226
def __await__(self):
227
"""Enable direct awaiting of TaskiqResult objects."""
228
229
def raise_for_error(self) -> TaskiqResult:
230
"""
231
Raise exception if task resulted in error.
232
233
Returns:
234
Self if no error occurred
235
236
Raises:
237
Exception: The original task exception if is_err is True
238
"""
239
```
240
241
### Execution Context
242
243
Context objects provide task execution environment information and control capabilities during task processing.
244
245
```python { .api }
246
class Context:
247
"""
248
Task execution context providing access to message data,
249
broker instance, and task control operations.
250
"""
251
252
message: TaskiqMessage
253
"""The original task message with metadata."""
254
255
broker: AsyncBroker
256
"""Broker instance executing this task."""
257
258
state: TaskiqState
259
"""Shared state object for the broker."""
260
261
def __init__(self, message: TaskiqMessage, broker: AsyncBroker) -> None: ...
262
263
async def requeue(self) -> None:
264
"""
265
Requeue the current task for later execution.
266
267
Increments requeue counter and sends task back to broker.
268
Always raises NoResultError to prevent result storage.
269
270
Raises:
271
NoResultError: Always raised to stop current execution
272
"""
273
274
def reject(self) -> None:
275
"""
276
Reject the current task and prevent reprocessing.
277
278
Always raises TaskRejectedError to mark task as rejected.
279
280
Raises:
281
TaskRejectedError: Always raised to reject the task
282
"""
283
```
284
285
### Task Gathering
286
287
Utility functions for working with multiple task results concurrently.
288
289
```python { .api }
290
async def gather(
291
*tasks: AsyncTaskiqTask[Any],
292
timeout: float = -1,
293
with_logs: bool = False,
294
periodicity: float = 0.1,
295
) -> Tuple[TaskiqResult[Any], ...]:
296
"""
297
Wait for multiple task results concurrently.
298
299
Similar to asyncio.gather but works with AsyncTaskiqTask objects.
300
301
Args:
302
*tasks: AsyncTaskiqTask objects to wait for
303
timeout: Maximum time to wait in seconds, -1 for no timeout
304
with_logs: Whether to fetch logs from result backend
305
periodicity: How often to check for task completion
306
307
Returns:
308
Tuple of TaskiqResult objects in the same order as input tasks
309
310
Raises:
311
TaskiqResultTimeoutError: If timeout is exceeded
312
"""
313
```
314
315
## Usage Examples
316
317
### Basic Task Definition and Execution
318
319
```python
320
from taskiq import InMemoryBroker
321
322
broker = InMemoryBroker()
323
324
@broker.task
325
async def calculate_sum(numbers: List[int]) -> int:
326
"""Calculate sum of numbers with artificial delay."""
327
await asyncio.sleep(1) # Simulate work
328
return sum(numbers)
329
330
# Execute task
331
async def main():
332
await broker.startup()
333
334
# Send task for execution
335
result = await calculate_sum.kiq([1, 2, 3, 4, 5])
336
337
# Wait for result
338
total = await result.wait_result(timeout=10.0)
339
print(f"Sum: {total}") # Sum: 15
340
341
await broker.shutdown()
342
```
343
344
### Error Handling and Result Inspection
345
346
```python
347
@broker.task
348
async def risky_operation(value: int) -> int:
349
if value < 0:
350
raise ValueError("Negative values not allowed")
351
return value * 2
352
353
async def handle_results():
354
result = await risky_operation.kiq(-5)
355
356
# Check if result is ready
357
if await result.is_ready():
358
try:
359
value = await result.wait_result()
360
print(f"Success: {value}")
361
except ValueError as e:
362
print(f"Task failed: {e}")
363
364
# Inspect result metadata
365
print(f"Execution time: {result.execution_time}s")
366
print(f"Had error: {result.is_err}")
367
if result.error:
368
print(f"Error type: {type(result.error).__name__}")
369
```
370
371
### Context Usage in Tasks
372
373
```python
374
from taskiq import Context, TaskiqDepends
375
376
@broker.task
377
async def context_aware_task(
378
data: str,
379
context: Context = TaskiqDepends(),
380
) -> str:
381
"""Task that uses execution context."""
382
383
# Access task metadata
384
task_id = context.message.task_id
385
requeue_count = context.message.labels.get("X-Taskiq-requeue", "0")
386
387
# Conditional requeue logic
388
if data == "retry_me" and int(requeue_count) < 2:
389
print(f"Requeuing task {task_id} (attempt {int(requeue_count) + 1})")
390
await context.requeue()
391
392
# Reject invalid data
393
if data == "invalid":
394
context.reject()
395
396
return f"Processed: {data} (ID: {task_id})"
397
```
398
399
### Multiple Task Coordination
400
401
```python
402
from taskiq import gather
403
404
@broker.task
405
async def fetch_data(url: str) -> dict:
406
# Simulate API call
407
await asyncio.sleep(random.uniform(0.5, 2.0))
408
return {"url": url, "data": f"content from {url}"}
409
410
async def fetch_multiple_sources():
411
# Start multiple tasks
412
tasks = [
413
fetch_data.kiq("https://api1.example.com"),
414
fetch_data.kiq("https://api2.example.com"),
415
fetch_data.kiq("https://api3.example.com"),
416
]
417
418
# Wait for all results
419
results = await gather(*tasks)
420
421
# Process combined results
422
all_data = {}
423
for result in results:
424
all_data[result["url"]] = result["data"]
425
426
return all_data
427
```
428
429
## Types
430
431
```python { .api }
432
class TaskiqMessage:
433
"""Message format for task data and metadata."""
434
435
task_id: str
436
task_name: str
437
labels: Dict[str, Any]
438
args: Tuple[Any, ...]
439
kwargs: Dict[str, Any]
440
441
class AsyncKicker:
442
"""Kicker object for advanced task parameter configuration."""
443
444
def __init__(
445
self,
446
task_name: str,
447
broker: AsyncBroker,
448
labels: Dict[str, Any],
449
return_type: Optional[Type[Any]] = None,
450
) -> None: ...
451
452
async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask[Any]: ...
453
async def schedule_by_cron(
454
self,
455
source: ScheduleSource,
456
cron: Union[str, CronSpec],
457
*args,
458
**kwargs,
459
) -> CreatedSchedule[Any]: ...
460
async def schedule_by_time(
461
self,
462
source: ScheduleSource,
463
time: datetime,
464
*args,
465
**kwargs,
466
) -> CreatedSchedule[Any]: ...
467
468
class CreatedSchedule:
469
"""Container for created schedule information."""
470
471
schedule_id: str
472
source: ScheduleSource
473
474
CronSpec = str # Type alias for cron specification strings
475
476
class TaskProgress:
477
"""Progress tracking container for long-running tasks."""
478
479
def __init__(self, current: int, total: int, message: str = "") -> None: ...
480
481
current: int
482
"""Current progress value."""
483
484
total: int
485
"""Total expected value."""
486
487
message: str
488
"""Optional progress message."""
489
```