Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
npx @tessl/cli install tessl/pypi-dramatiq@1.18.00
# Dramatiq
1
2
A fast and reliable distributed task processing library for Python 3 that provides a simple API for defining background tasks (actors) and distributing them across workers. Dramatiq supports multiple message brokers including RabbitMQ and Redis, offers comprehensive middleware for rate limiting, retries, and result storage, and includes advanced features like task composition, gevent integration, and robust error handling.
3
4
## Package Information
5
6
- **Package Name**: dramatiq
7
- **Language**: Python
8
- **Installation**: `pip install dramatiq`
9
- **Extras**: `pip install dramatiq[redis]`, `pip install dramatiq[rabbitmq]`, `pip install dramatiq[all]`
10
11
## Core Imports
12
13
```python
14
import dramatiq
15
```
16
17
Common imports for actors and brokers:
18
19
```python
20
from dramatiq import actor, get_broker, set_broker
21
from dramatiq.brokers.redis import RedisBroker
22
from dramatiq.brokers.rabbitmq import RabbitmqBroker
23
```
24
25
## Basic Usage
26
27
```python
28
import dramatiq
29
from dramatiq.brokers.redis import RedisBroker
30
31
# Set up the broker
32
redis_broker = RedisBroker(host="localhost", port=6379, db=0)
33
dramatiq.set_broker(redis_broker)
34
35
# Define actors (background tasks)
36
@dramatiq.actor
37
def send_email(to, subject, body):
38
# Send email implementation
39
print(f"Sending email to {to}: {subject}")
40
# ... email sending logic ...
41
42
@dramatiq.actor(queue_name="critical", priority=10)
43
def process_payment(user_id, amount):
44
# Process payment implementation
45
print(f"Processing payment: ${amount} for user {user_id}")
46
# ... payment processing logic ...
47
48
# Send messages (enqueue tasks)
49
send_email.send("user@example.com", "Welcome!", "Thanks for signing up!")
50
process_payment.send(123, 50.00)
51
52
# Run workers to process tasks
53
# In terminal: dramatiq my_module
54
```
55
56
Advanced usage with composition:
57
58
```python
59
from dramatiq import pipeline, group
60
61
# Pipeline: sequential execution
62
pipe = send_email.message("user@example.com", "Step 1", "First step") | \\
63
process_payment.message(123, 50.00) | \\
64
send_email.message("user@example.com", "Step 2", "Payment processed")
65
pipe.run()
66
67
# Group: parallel execution
68
tasks = group([
69
send_email.message("user1@example.com", "Bulk", "Message 1"),
70
send_email.message("user2@example.com", "Bulk", "Message 2"),
71
send_email.message("user3@example.com", "Bulk", "Message 3")
72
])
73
tasks.run()
74
```
75
76
## Architecture
77
78
Dramatiq uses an Actor model where tasks are defined as actors and messages are sent to these actors for processing:
79
80
- **Actors**: Decorated functions or classes that define background tasks
81
- **Brokers**: Message brokers (Redis, RabbitMQ) that handle message routing and persistence
82
- **Workers**: Processes that consume messages from brokers and execute actors
83
- **Messages**: Serialized task data containing actor name, arguments, and metadata
84
- **Middleware**: Components that intercept and modify message processing (retries, time limits, etc.)
85
- **Composition**: Tools for chaining tasks in pipelines or grouping tasks for parallel execution
86
87
## Capabilities
88
89
### Actor System
90
91
Define and manage background tasks using decorators or classes, with support for queues, priorities, and custom options.
92
93
```python { .api }
94
@actor(queue_name: str = "default", priority: int = 0, **options)
95
def my_task(arg1, arg2): ...
96
97
class Actor:
98
def __init__(fn, *, broker, actor_name, queue_name, priority, options): ...
99
def send(*args, **kwargs) -> Message: ...
100
def send_with_options(*, args=(), kwargs=None, delay=None, **options) -> Message: ...
101
102
class GenericActor:
103
def perform(*args, **kwargs): ... # Abstract method
104
```
105
106
[Actors](./actors.md)
107
108
### Message Brokers
109
110
Connect to Redis, RabbitMQ, or use in-memory brokers for development and testing.
111
112
```python { .api }
113
class RedisBroker(Broker):
114
def __init__(*, url=None, namespace="dramatiq", heartbeat_timeout=60000, **params): ...
115
116
class RabbitmqBroker(Broker):
117
def __init__(*, url=None, confirm_delivery=False, max_priority=None, **kwargs): ...
118
119
class StubBroker(Broker):
120
def __init__(middleware=None): ...
121
122
def get_broker() -> Broker: ...
123
def set_broker(broker: Broker): ...
124
```
125
126
[Brokers](./brokers.md)
127
128
### Task Composition
129
130
Chain tasks sequentially with pipelines or execute multiple tasks in parallel with groups.
131
132
```python { .api }
133
class pipeline:
134
def __init__(children: Iterable[Message], *, broker=None): ...
135
def run(*, delay=None) -> pipeline: ...
136
def get_result(*, block=False, timeout=None): ...
137
138
class group:
139
def __init__(children, *, broker=None): ...
140
def run(*, delay=None) -> group: ...
141
def get_results(*, block=False, timeout=None): ...
142
def wait(*, timeout=None): ...
143
```
144
145
[Composition](./composition.md)
146
147
### Middleware System
148
149
Extend functionality with built-in middleware for retries, time limits, rate limiting, and custom processing.
150
151
```python { .api }
152
class Middleware:
153
def before_process_message(broker, message): ...
154
def after_process_message(broker, message, *, result=None, exception=None): ...
155
156
class Retries(Middleware):
157
def __init__(*, max_retries=20, min_backoff=15000, max_backoff=604800000): ...
158
159
class TimeLimit(Middleware):
160
def __init__(*, time_limit=600000, interval=1000): ...
161
162
class AgeLimit(Middleware):
163
def __init__(*, max_age=None): ...
164
```
165
166
[Middleware](./middleware.md)
167
168
### Rate Limiting
169
170
Control task execution rates and implement synchronization barriers using various rate limiting strategies.
171
172
```python { .api }
173
class BucketRateLimiter(RateLimiter):
174
def __init__(backend, key, *, limit, bucket): ...
175
176
class ConcurrentRateLimiter(RateLimiter):
177
def __init__(backend, key, *, limit, ttl=900000): ...
178
179
class WindowRateLimiter(RateLimiter):
180
def __init__(backend, key, *, limit, window): ...
181
182
class Barrier:
183
def __init__(backend, key, *, ttl=900000): ...
184
def create(size): ...
185
def wait(timeout=None): ...
186
```
187
188
[Rate Limiting](./rate-limiting.md)
189
190
### Result Storage
191
192
Store and retrieve task results using Redis, Memcached, or in-memory backends.
193
194
```python { .api }
195
class Results(Middleware):
196
def __init__(*, backend=None, store_results=False): ...
197
198
class ResultBackend:
199
def get_result(message, *, block=False, timeout=10000): ...
200
def store_result(message, result, ttl): ...
201
202
class Message:
203
def get_result(*, backend=None, block=False, timeout=None): ...
204
```
205
206
[Results](./results.md)
207
208
### Worker Management
209
210
Configure and run workers to process messages from brokers with customizable threading and timeout settings.
211
212
```python { .api }
213
class Worker:
214
def __init__(broker, *, queues=None, worker_timeout=1000, worker_threads=8): ...
215
def start(): ...
216
def stop(): ...
217
def join(): ...
218
```
219
220
[Workers](./workers.md)
221
222
### Message Handling
223
224
Work with message objects and customize encoding for different serialization needs.
225
226
```python { .api }
227
class Message:
228
def __init__(queue_name, actor_name, args, kwargs, options, message_id, message_timestamp): ...
229
def encode() -> bytes: ...
230
def copy(**attributes) -> Message: ...
231
232
class JSONEncoder(Encoder):
233
def encode(data) -> bytes: ...
234
def decode(data: bytes): ...
235
236
def get_encoder() -> Encoder: ...
237
def set_encoder(encoder: Encoder): ...
238
```
239
240
[Messages](./messages.md)
241
242
## Error Handling
243
244
Dramatiq provides a comprehensive error hierarchy for handling various failure scenarios:
245
246
```python { .api }
247
# Base errors
248
class DramatiqError(Exception): ...
249
class BrokerError(DramatiqError): ...
250
class ActorNotFound(DramatiqError): ...
251
class QueueNotFound(DramatiqError): ...
252
class RateLimitExceeded(DramatiqError): ...
253
254
# Connection errors
255
class ConnectionError(BrokerError): ...
256
class ConnectionFailed(ConnectionError): ...
257
class ConnectionClosed(ConnectionError): ...
258
259
# Processing errors
260
class Retry(Exception): # Signals intentional retry
261
def __init__(delay=None): ...
262
263
class TimeLimitExceeded(Exception): ...
264
265
# Result errors
266
class ResultError(Exception): ...
267
class ResultMissing(ResultError): ...
268
class ResultTimeout(ResultError): ...
269
class ResultFailure(ResultError): ...
270
```
271
272
Common error handling patterns:
273
274
```python
275
@dramatiq.actor(max_retries=5)
276
def reliable_task(data):
277
try:
278
# Task implementation
279
process_data(data)
280
except TemporaryError as e:
281
# Retry with custom delay
282
raise dramatiq.Retry(delay=30000) # 30 seconds
283
except PermanentError as e:
284
# Log and don't retry
285
logger.error(f"Permanent failure: {e}")
286
raise
287
```
288
289
## Constants and Configuration
290
291
```python { .api }
292
# Default values
293
DEFAULT_QUEUE_NAME = "default"
294
DEFAULT_PRIORITY = 0
295
DEFAULT_WORKER_THREADS = 8
296
DEFAULT_WORKER_TIMEOUT = 1000 # milliseconds
297
DEFAULT_TIME_LIMIT = 600000 # 10 minutes
298
DEFAULT_MAX_RETRIES = 20
299
300
# Queue name validation pattern
301
QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"
302
```