or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdevents-state.mdexceptions.mdindex.mdmiddleware.mdresult-backends.mdscheduling.mdtasks-results.md

index.mddocs/

0

# Taskiq

1

2

Taskiq is an asynchronous distributed task queue for Python that enables sending and running both synchronous and asynchronous functions across distributed systems. It provides full async support with integration for popular frameworks like FastAPI and AioHTTP, comprehensive type safety with PEP-612 support, and various broker backends including NATS, Redis, RabbitMQ, and Kafka.

3

4

## Package Information

5

6

- **Package Name**: taskiq

7

- **Language**: Python

8

- **Installation**: `pip install taskiq`

9

- **Python Versions**: 3.8+

10

11

## Core Imports

12

13

```python

14

import taskiq

15

```

16

17

Common imports for basic usage:

18

19

```python

20

from taskiq import InMemoryBroker, TaskiqResult, Context

21

```

22

23

For production usage with external brokers:

24

25

```python

26

from taskiq_nats import JetStreamBroker # External package

27

from taskiq_redis import RedisBroker # External package

28

```

29

30

## Basic Usage

31

32

```python

33

import asyncio

34

from taskiq import InMemoryBroker

35

36

# Create a broker

37

broker = InMemoryBroker()

38

39

# Define a task

40

@broker.task

41

async def add_numbers(a: int, b: int) -> int:

42

return a + b

43

44

# Alternative: sync task

45

@broker.task

46

def multiply_numbers(a: int, b: int) -> int:

47

return a * b

48

49

async def main():

50

# Startup broker

51

await broker.startup()

52

53

# Send tasks for execution

54

result1 = await add_numbers.kiq(5, 3)

55

result2 = await multiply_numbers.kiq(4, 7)

56

57

# Get results

58

value1 = await result1.wait_result() # 8

59

value2 = await result2.wait_result() # 28

60

61

print(f"Addition result: {value1}")

62

print(f"Multiplication result: {value2}")

63

64

# Shutdown broker

65

await broker.shutdown()

66

67

if __name__ == "__main__":

68

asyncio.run(main())

69

```

70

71

## Architecture

72

73

Taskiq follows a distributed architecture with these core components:

74

75

- **Broker**: Message queue interface that handles task distribution

76

- **Worker**: Process that executes tasks received from the broker

77

- **Result Backend**: Storage system for task results

78

- **Scheduler**: Component for handling periodic and scheduled tasks

79

- **Middleware**: Pipeline for cross-cutting concerns like retries and monitoring

80

81

The library supports both in-memory brokers for development and external broker systems (NATS, Redis, etc.) for production distributed environments.

82

83

## Capabilities

84

85

### Broker Management

86

87

Core broker functionality for creating, configuring, and managing task distribution. Includes abstract base classes and concrete implementations for different message queue backends.

88

89

```python { .api }

90

class AsyncBroker:

91

def task(self, task_name: Optional[str] = None, **labels: Any) -> Callable

92

def register_task(self, func: Callable, task_name: Optional[str] = None, **labels: Any) -> AsyncTaskiqDecoratedTask

93

def with_result_backend(self, result_backend: AsyncResultBackend) -> Self

94

def with_middlewares(self, *middlewares: TaskiqMiddleware) -> Self

95

async def startup(self) -> None

96

async def shutdown(self) -> None

97

98

class InMemoryBroker(AsyncBroker): ...

99

class ZeroMQBroker(AsyncBroker): ...

100

101

async_shared_broker: AsyncBroker

102

"""Global shared broker instance for cross-module usage."""

103

```

104

105

[Brokers](./brokers.md)

106

107

### Task Execution and Results

108

109

Task execution system including decorated task wrappers, result containers, and context management for task execution environments.

110

111

```python { .api }

112

class AsyncTaskiqDecoratedTask:

113

async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask

114

115

class AsyncTaskiqTask:

116

async def wait_result(

117

self,

118

check_interval: float = 0.2,

119

timeout: float = -1.0,

120

with_logs: bool = False,

121

) -> TaskiqResult

122

123

class TaskiqResult:

124

is_err: bool

125

return_value: Any

126

execution_time: float

127

labels: Dict[str, Any]

128

error: Optional[BaseException]

129

130

class Context:

131

message: TaskiqMessage

132

broker: AsyncBroker

133

state: TaskiqState

134

async def requeue(self) -> None

135

def reject(self) -> None

136

137

async def gather(

138

*tasks: AsyncTaskiqTask[Any],

139

timeout: float = -1,

140

with_logs: bool = False,

141

periodicity: float = 0.1,

142

) -> Tuple[TaskiqResult[Any], ...]:

143

"""Wait for multiple task results concurrently."""

144

```

145

146

[Tasks and Results](./tasks-results.md)

147

148

### Middleware System

149

150

Extensible middleware pipeline for implementing cross-cutting concerns like retries, monitoring, and custom processing logic.

151

152

```python { .api }

153

class TaskiqMiddleware:

154

async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage

155

async def post_send(self, message: TaskiqMessage) -> None

156

async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage

157

async def post_execute(self, message: TaskiqMessage, result: TaskiqResult) -> None

158

159

class SimpleRetryMiddleware(TaskiqMiddleware): ...

160

class SmartRetryMiddleware(TaskiqMiddleware): ...

161

class PrometheusMiddleware(TaskiqMiddleware): ...

162

```

163

164

[Middleware](./middleware.md)

165

166

### Scheduling

167

168

Task scheduling capabilities for periodic execution, cron-like scheduling, and delayed task execution.

169

170

```python { .api }

171

class TaskiqScheduler:

172

def __init__(self, broker: AsyncBroker, sources: List[ScheduleSource]) -> None

173

async def startup(self) -> None

174

async def shutdown(self) -> None

175

176

class ScheduledTask:

177

task_name: str

178

cron: Optional[str]

179

time: Optional[datetime]

180

labels: Dict[str, Any]

181

args: Tuple[Any, ...]

182

kwargs: Dict[str, Any]

183

```

184

185

[Scheduling](./scheduling.md)

186

187

### Result Backends

188

189

Storage systems for persisting task results and progress tracking across distributed environments.

190

191

```python { .api }

192

class AsyncResultBackend:

193

async def set_result(self, task_id: str, result: TaskiqResult) -> None

194

async def get_result(self, task_id: str, with_logs: bool = True) -> TaskiqResult

195

async def is_result_ready(self, task_id: str) -> bool

196

async def startup(self) -> None

197

async def shutdown(self) -> None

198

```

199

200

[Result Backends](./result-backends.md)

201

202

### Events and State

203

204

Event system for lifecycle management and global state container for broker and task coordination.

205

206

```python { .api }

207

class TaskiqEvents:

208

CLIENT_STARTUP: str

209

CLIENT_SHUTDOWN: str

210

WORKER_STARTUP: str

211

WORKER_SHUTDOWN: str

212

213

class TaskiqState:

214

def __init__(self) -> None

215

def set_value(self, key: str, value: Any) -> None

216

def get_value(self, key: str, default: Any = None) -> Any

217

```

218

219

[Events and State](./events-state.md)

220

221

### Exception Handling

222

223

Comprehensive exception hierarchy for handling various error conditions in distributed task processing.

224

225

```python { .api }

226

class TaskiqError(Exception): ...

227

class NoResultError(TaskiqError): ...

228

class ResultGetError(TaskiqError): ...

229

class SendTaskError(TaskiqError): ...

230

class SecurityError(TaskiqError): ...

231

class TaskiqResultTimeoutError(TaskiqError): ...

232

```

233

234

[Exceptions](./exceptions.md)

235

236

## Types

237

238

```python { .api }

239

class BrokerMessage:

240

task_id: str

241

task_name: str

242

message: bytes

243

labels: Dict[str, str]

244

245

class TaskiqMessage:

246

task_id: str

247

task_name: str

248

labels: Dict[str, Any]

249

args: Tuple[Any, ...]

250

kwargs: Dict[str, Any]

251

252

class AckableMessage:

253

data: Union[bytes, str]

254

ack: Callable[[], Awaitable[None]]

255

256

class TaskiqFormatter:

257

"""Abstract base class for message formatting."""

258

def dumps(self, message: TaskiqMessage) -> BrokerMessage

259

def loads(self, message: bytes) -> TaskiqMessage

260

261

class ScheduleSource:

262

"""Abstract base class for schedule sources."""

263

async def get_schedules(self) -> List[ScheduledTask]

264

async def pre_send(self, task: ScheduledTask) -> None

265

async def post_send(self, task: ScheduledTask) -> None

266

267

TaskiqDepends = Depends # From taskiq_dependencies package

268

269

__version__: str

270

"""Package version string."""

271

```