or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dramatiq@1.18.x

To install, run

npx @tessl/cli install tessl/pypi-dramatiq@1.18.0

0

# Dramatiq

1

2

A fast and reliable distributed task processing library for Python 3 that provides a simple API for defining background tasks (actors) and distributing them across workers. Dramatiq supports multiple message brokers including RabbitMQ and Redis, offers comprehensive middleware for rate limiting, retries, and result storage, and includes advanced features like task composition, gevent integration, and robust error handling.

3

4

## Package Information

5

6

- **Package Name**: dramatiq

7

- **Language**: Python

8

- **Installation**: `pip install dramatiq`

9

- **Extras**: `pip install dramatiq[redis]`, `pip install dramatiq[rabbitmq]`, `pip install dramatiq[all]`

10

11

## Core Imports

12

13

```python

14

import dramatiq

15

```

16

17

Common imports for actors and brokers:

18

19

```python

20

from dramatiq import actor, get_broker, set_broker

21

from dramatiq.brokers.redis import RedisBroker

22

from dramatiq.brokers.rabbitmq import RabbitmqBroker

23

```

24

25

## Basic Usage

26

27

```python

28

import dramatiq

29

from dramatiq.brokers.redis import RedisBroker

30

31

# Set up the broker

32

redis_broker = RedisBroker(host="localhost", port=6379, db=0)

33

dramatiq.set_broker(redis_broker)

34

35

# Define actors (background tasks)

36

@dramatiq.actor

37

def send_email(to, subject, body):

38

# Send email implementation

39

print(f"Sending email to {to}: {subject}")

40

# ... email sending logic ...

41

42

@dramatiq.actor(queue_name="critical", priority=10)

43

def process_payment(user_id, amount):

44

# Process payment implementation

45

print(f"Processing payment: ${amount} for user {user_id}")

46

# ... payment processing logic ...

47

48

# Send messages (enqueue tasks)

49

send_email.send("user@example.com", "Welcome!", "Thanks for signing up!")

50

process_payment.send(123, 50.00)

51

52

# Run workers to process tasks

53

# In terminal: dramatiq my_module

54

```

55

56

Advanced usage with composition:

57

58

```python

59

from dramatiq import pipeline, group

60

61

# Pipeline: sequential execution

62

pipe = send_email.message("user@example.com", "Step 1", "First step") | \\

63

process_payment.message(123, 50.00) | \\

64

send_email.message("user@example.com", "Step 2", "Payment processed")

65

pipe.run()

66

67

# Group: parallel execution

68

tasks = group([

69

send_email.message("user1@example.com", "Bulk", "Message 1"),

70

send_email.message("user2@example.com", "Bulk", "Message 2"),

71

send_email.message("user3@example.com", "Bulk", "Message 3")

72

])

73

tasks.run()

74

```

75

76

## Architecture

77

78

Dramatiq uses an Actor model where tasks are defined as actors and messages are sent to these actors for processing:

79

80

- **Actors**: Decorated functions or classes that define background tasks

81

- **Brokers**: Message brokers (Redis, RabbitMQ) that handle message routing and persistence

82

- **Workers**: Processes that consume messages from brokers and execute actors

83

- **Messages**: Serialized task data containing actor name, arguments, and metadata

84

- **Middleware**: Components that intercept and modify message processing (retries, time limits, etc.)

85

- **Composition**: Tools for chaining tasks in pipelines or grouping tasks for parallel execution

86

87

## Capabilities

88

89

### Actor System

90

91

Define and manage background tasks using decorators or classes, with support for queues, priorities, and custom options.

92

93

```python { .api }

94

@actor(queue_name: str = "default", priority: int = 0, **options)

95

def my_task(arg1, arg2): ...

96

97

class Actor:

98

def __init__(fn, *, broker, actor_name, queue_name, priority, options): ...

99

def send(*args, **kwargs) -> Message: ...

100

def send_with_options(*, args=(), kwargs=None, delay=None, **options) -> Message: ...

101

102

class GenericActor:

103

def perform(*args, **kwargs): ... # Abstract method

104

```

105

106

[Actors](./actors.md)

107

108

### Message Brokers

109

110

Connect to Redis, RabbitMQ, or use in-memory brokers for development and testing.

111

112

```python { .api }

113

class RedisBroker(Broker):

114

def __init__(*, url=None, namespace="dramatiq", heartbeat_timeout=60000, **params): ...

115

116

class RabbitmqBroker(Broker):

117

def __init__(*, url=None, confirm_delivery=False, max_priority=None, **kwargs): ...

118

119

class StubBroker(Broker):

120

def __init__(middleware=None): ...

121

122

def get_broker() -> Broker: ...

123

def set_broker(broker: Broker): ...

124

```

125

126

[Brokers](./brokers.md)

127

128

### Task Composition

129

130

Chain tasks sequentially with pipelines or execute multiple tasks in parallel with groups.

131

132

```python { .api }

133

class pipeline:

134

def __init__(children: Iterable[Message], *, broker=None): ...

135

def run(*, delay=None) -> pipeline: ...

136

def get_result(*, block=False, timeout=None): ...

137

138

class group:

139

def __init__(children, *, broker=None): ...

140

def run(*, delay=None) -> group: ...

141

def get_results(*, block=False, timeout=None): ...

142

def wait(*, timeout=None): ...

143

```

144

145

[Composition](./composition.md)

146

147

### Middleware System

148

149

Extend functionality with built-in middleware for retries, time limits, rate limiting, and custom processing.

150

151

```python { .api }

152

class Middleware:

153

def before_process_message(broker, message): ...

154

def after_process_message(broker, message, *, result=None, exception=None): ...

155

156

class Retries(Middleware):

157

def __init__(*, max_retries=20, min_backoff=15000, max_backoff=604800000): ...

158

159

class TimeLimit(Middleware):

160

def __init__(*, time_limit=600000, interval=1000): ...

161

162

class AgeLimit(Middleware):

163

def __init__(*, max_age=None): ...

164

```

165

166

[Middleware](./middleware.md)

167

168

### Rate Limiting

169

170

Control task execution rates and implement synchronization barriers using various rate limiting strategies.

171

172

```python { .api }

173

class BucketRateLimiter(RateLimiter):

174

def __init__(backend, key, *, limit, bucket): ...

175

176

class ConcurrentRateLimiter(RateLimiter):

177

def __init__(backend, key, *, limit, ttl=900000): ...

178

179

class WindowRateLimiter(RateLimiter):

180

def __init__(backend, key, *, limit, window): ...

181

182

class Barrier:

183

def __init__(backend, key, *, ttl=900000): ...

184

def create(size): ...

185

def wait(timeout=None): ...

186

```

187

188

[Rate Limiting](./rate-limiting.md)

189

190

### Result Storage

191

192

Store and retrieve task results using Redis, Memcached, or in-memory backends.

193

194

```python { .api }

195

class Results(Middleware):

196

def __init__(*, backend=None, store_results=False): ...

197

198

class ResultBackend:

199

def get_result(message, *, block=False, timeout=10000): ...

200

def store_result(message, result, ttl): ...

201

202

class Message:

203

def get_result(*, backend=None, block=False, timeout=None): ...

204

```

205

206

[Results](./results.md)

207

208

### Worker Management

209

210

Configure and run workers to process messages from brokers with customizable threading and timeout settings.

211

212

```python { .api }

213

class Worker:

214

def __init__(broker, *, queues=None, worker_timeout=1000, worker_threads=8): ...

215

def start(): ...

216

def stop(): ...

217

def join(): ...

218

```

219

220

[Workers](./workers.md)

221

222

### Message Handling

223

224

Work with message objects and customize encoding for different serialization needs.

225

226

```python { .api }

227

class Message:

228

def __init__(queue_name, actor_name, args, kwargs, options, message_id, message_timestamp): ...

229

def encode() -> bytes: ...

230

def copy(**attributes) -> Message: ...

231

232

class JSONEncoder(Encoder):

233

def encode(data) -> bytes: ...

234

def decode(data: bytes): ...

235

236

def get_encoder() -> Encoder: ...

237

def set_encoder(encoder: Encoder): ...

238

```

239

240

[Messages](./messages.md)

241

242

## Error Handling

243

244

Dramatiq provides a comprehensive error hierarchy for handling various failure scenarios:

245

246

```python { .api }

247

# Base errors

248

class DramatiqError(Exception): ...

249

class BrokerError(DramatiqError): ...

250

class ActorNotFound(DramatiqError): ...

251

class QueueNotFound(DramatiqError): ...

252

class RateLimitExceeded(DramatiqError): ...

253

254

# Connection errors

255

class ConnectionError(BrokerError): ...

256

class ConnectionFailed(ConnectionError): ...

257

class ConnectionClosed(ConnectionError): ...

258

259

# Processing errors

260

class Retry(Exception): # Signals intentional retry

261

def __init__(delay=None): ...

262

263

class TimeLimitExceeded(Exception): ...

264

265

# Result errors

266

class ResultError(Exception): ...

267

class ResultMissing(ResultError): ...

268

class ResultTimeout(ResultError): ...

269

class ResultFailure(ResultError): ...

270

```

271

272

Common error handling patterns:

273

274

```python

275

@dramatiq.actor(max_retries=5)

276

def reliable_task(data):

277

try:

278

# Task implementation

279

process_data(data)

280

except TemporaryError as e:

281

# Retry with custom delay

282

raise dramatiq.Retry(delay=30000) # 30 seconds

283

except PermanentError as e:

284

# Log and don't retry

285

logger.error(f"Permanent failure: {e}")

286

raise

287

```

288

289

## Constants and Configuration

290

291

```python { .api }

292

# Default values

293

DEFAULT_QUEUE_NAME = "default"

294

DEFAULT_PRIORITY = 0

295

DEFAULT_WORKER_THREADS = 8

296

DEFAULT_WORKER_TIMEOUT = 1000 # milliseconds

297

DEFAULT_TIME_LIMIT = 600000 # 10 minutes

298

DEFAULT_MAX_RETRIES = 20

299

300

# Queue name validation pattern

301

QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"

302

```