Distributed task queue with full async support for Python applications
npx @tessl/cli install tessl/pypi-taskiq@0.11.00
# Taskiq
1
2
Taskiq is an asynchronous distributed task queue for Python that enables sending and running both synchronous and asynchronous functions across distributed systems. It provides full async support with integration for popular frameworks like FastAPI and AioHTTP, comprehensive type safety with PEP-612 support, and various broker backends including NATS, Redis, RabbitMQ, and Kafka.
3
4
## Package Information
5
6
- **Package Name**: taskiq
7
- **Language**: Python
8
- **Installation**: `pip install taskiq`
9
- **Python Versions**: 3.8+
10
11
## Core Imports
12
13
```python
14
import taskiq
15
```
16
17
Common imports for basic usage:
18
19
```python
20
from taskiq import InMemoryBroker, TaskiqResult, Context
21
```
22
23
For production usage with external brokers:
24
25
```python
26
from taskiq_nats import JetStreamBroker # External package
27
from taskiq_redis import RedisBroker # External package
28
```
29
30
## Basic Usage
31
32
```python
33
import asyncio
34
from taskiq import InMemoryBroker
35
36
# Create a broker
37
broker = InMemoryBroker()
38
39
# Define a task
40
@broker.task
41
async def add_numbers(a: int, b: int) -> int:
42
return a + b
43
44
# Alternative: sync task
45
@broker.task
46
def multiply_numbers(a: int, b: int) -> int:
47
return a * b
48
49
async def main():
50
# Startup broker
51
await broker.startup()
52
53
# Send tasks for execution
54
result1 = await add_numbers.kiq(5, 3)
55
result2 = await multiply_numbers.kiq(4, 7)
56
57
# Get results
58
value1 = await result1.wait_result() # 8
59
value2 = await result2.wait_result() # 28
60
61
print(f"Addition result: {value1}")
62
print(f"Multiplication result: {value2}")
63
64
# Shutdown broker
65
await broker.shutdown()
66
67
if __name__ == "__main__":
68
asyncio.run(main())
69
```
70
71
## Architecture
72
73
Taskiq follows a distributed architecture with these core components:
74
75
- **Broker**: Message queue interface that handles task distribution
76
- **Worker**: Process that executes tasks received from the broker
77
- **Result Backend**: Storage system for task results
78
- **Scheduler**: Component for handling periodic and scheduled tasks
79
- **Middleware**: Pipeline for cross-cutting concerns like retries and monitoring
80
81
The library supports both in-memory brokers for development and external broker systems (NATS, Redis, etc.) for production distributed environments.
82
83
## Capabilities
84
85
### Broker Management
86
87
Core broker functionality for creating, configuring, and managing task distribution. Includes abstract base classes and concrete implementations for different message queue backends.
88
89
```python { .api }
90
class AsyncBroker:
91
def task(self, task_name: Optional[str] = None, **labels: Any) -> Callable
92
def register_task(self, func: Callable, task_name: Optional[str] = None, **labels: Any) -> AsyncTaskiqDecoratedTask
93
def with_result_backend(self, result_backend: AsyncResultBackend) -> Self
94
def with_middlewares(self, *middlewares: TaskiqMiddleware) -> Self
95
async def startup(self) -> None
96
async def shutdown(self) -> None
97
98
class InMemoryBroker(AsyncBroker): ...
99
class ZeroMQBroker(AsyncBroker): ...
100
101
async_shared_broker: AsyncBroker
102
"""Global shared broker instance for cross-module usage."""
103
```
104
105
[Brokers](./brokers.md)
106
107
### Task Execution and Results
108
109
Task execution system including decorated task wrappers, result containers, and context management for task execution environments.
110
111
```python { .api }
112
class AsyncTaskiqDecoratedTask:
113
async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask
114
115
class AsyncTaskiqTask:
116
async def wait_result(
117
self,
118
check_interval: float = 0.2,
119
timeout: float = -1.0,
120
with_logs: bool = False,
121
) -> TaskiqResult
122
123
class TaskiqResult:
124
is_err: bool
125
return_value: Any
126
execution_time: float
127
labels: Dict[str, Any]
128
error: Optional[BaseException]
129
130
class Context:
131
message: TaskiqMessage
132
broker: AsyncBroker
133
state: TaskiqState
134
async def requeue(self) -> None
135
def reject(self) -> None
136
137
async def gather(
138
*tasks: AsyncTaskiqTask[Any],
139
timeout: float = -1,
140
with_logs: bool = False,
141
periodicity: float = 0.1,
142
) -> Tuple[TaskiqResult[Any], ...]:
143
"""Wait for multiple task results concurrently."""
144
```
145
146
[Tasks and Results](./tasks-results.md)
147
148
### Middleware System
149
150
Extensible middleware pipeline for implementing cross-cutting concerns like retries, monitoring, and custom processing logic.
151
152
```python { .api }
153
class TaskiqMiddleware:
154
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage
155
async def post_send(self, message: TaskiqMessage) -> None
156
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage
157
async def post_execute(self, message: TaskiqMessage, result: TaskiqResult) -> None
158
159
class SimpleRetryMiddleware(TaskiqMiddleware): ...
160
class SmartRetryMiddleware(TaskiqMiddleware): ...
161
class PrometheusMiddleware(TaskiqMiddleware): ...
162
```
163
164
[Middleware](./middleware.md)
165
166
### Scheduling
167
168
Task scheduling capabilities for periodic execution, cron-like scheduling, and delayed task execution.
169
170
```python { .api }
171
class TaskiqScheduler:
172
def __init__(self, broker: AsyncBroker, sources: List[ScheduleSource]) -> None
173
async def startup(self) -> None
174
async def shutdown(self) -> None
175
176
class ScheduledTask:
177
task_name: str
178
cron: Optional[str]
179
time: Optional[datetime]
180
labels: Dict[str, Any]
181
args: Tuple[Any, ...]
182
kwargs: Dict[str, Any]
183
```
184
185
[Scheduling](./scheduling.md)
186
187
### Result Backends
188
189
Storage systems for persisting task results and progress tracking across distributed environments.
190
191
```python { .api }
192
class AsyncResultBackend:
193
async def set_result(self, task_id: str, result: TaskiqResult) -> None
194
async def get_result(self, task_id: str, with_logs: bool = True) -> TaskiqResult
195
async def is_result_ready(self, task_id: str) -> bool
196
async def startup(self) -> None
197
async def shutdown(self) -> None
198
```
199
200
[Result Backends](./result-backends.md)
201
202
### Events and State
203
204
Event system for lifecycle management and global state container for broker and task coordination.
205
206
```python { .api }
207
class TaskiqEvents:
208
CLIENT_STARTUP: str
209
CLIENT_SHUTDOWN: str
210
WORKER_STARTUP: str
211
WORKER_SHUTDOWN: str
212
213
class TaskiqState:
214
def __init__(self) -> None
215
def set_value(self, key: str, value: Any) -> None
216
def get_value(self, key: str, default: Any = None) -> Any
217
```
218
219
[Events and State](./events-state.md)
220
221
### Exception Handling
222
223
Comprehensive exception hierarchy for handling various error conditions in distributed task processing.
224
225
```python { .api }
226
class TaskiqError(Exception): ...
227
class NoResultError(TaskiqError): ...
228
class ResultGetError(TaskiqError): ...
229
class SendTaskError(TaskiqError): ...
230
class SecurityError(TaskiqError): ...
231
class TaskiqResultTimeoutError(TaskiqError): ...
232
```
233
234
[Exceptions](./exceptions.md)
235
236
## Types
237
238
```python { .api }
239
class BrokerMessage:
240
task_id: str
241
task_name: str
242
message: bytes
243
labels: Dict[str, str]
244
245
class TaskiqMessage:
246
task_id: str
247
task_name: str
248
labels: Dict[str, Any]
249
args: Tuple[Any, ...]
250
kwargs: Dict[str, Any]
251
252
class AckableMessage:
253
data: Union[bytes, str]
254
ack: Callable[[], Awaitable[None]]
255
256
class TaskiqFormatter:
257
"""Abstract base class for message formatting."""
258
def dumps(self, message: TaskiqMessage) -> BrokerMessage
259
def loads(self, message: bytes) -> TaskiqMessage
260
261
class ScheduleSource:
262
"""Abstract base class for schedule sources."""
263
async def get_schedules(self) -> List[ScheduledTask]
264
async def pre_send(self, task: ScheduledTask) -> None
265
async def post_send(self, task: ScheduledTask) -> None
266
267
TaskiqDepends = Depends # From taskiq_dependencies package
268
269
__version__: str
270
"""Package version string."""
271
```