0
# Actors
1
2
The actor system in Dramatiq provides the foundation for defining and managing background tasks. Actors are functions or classes decorated to run asynchronously, supporting queues, priorities, retries, and custom processing options.
3
4
## Capabilities
5
6
### Actor Decorator
7
8
Transform regular functions into actors that can be executed asynchronously by workers.
9
10
```python { .api }
11
@actor(
12
fn=None, *,
13
actor_class=Actor,
14
actor_name: str = None,
15
queue_name: str = "default",
16
priority: int = 0,
17
broker: Broker = None,
18
**options
19
)
20
def decorated_function(*args, **kwargs): ...
21
```
22
23
**Parameters:**
24
- `fn`: Function to decorate (optional for parametrized decorator)
25
- `actor_class`: Actor class to use (default: Actor)
26
- `actor_name`: Actor name (defaults to function name)
27
- `queue_name`: Queue name (default: "default")
28
- `priority`: Priority level (default: 0, lower values = higher priority)
29
- `broker`: Broker instance (uses global broker if None)
30
- `**options`: Additional actor options for middleware/broker
31
32
**Usage:**
33
34
```python
35
# Simple actor
36
@dramatiq.actor
37
def send_email(to, subject, body):
38
print(f"Sending email to {to}: {subject}")
39
40
# Actor with options
41
@dramatiq.actor(queue_name="high_priority", priority=1, max_retries=5)
42
def process_payment(user_id, amount):
43
print(f"Processing ${amount} for user {user_id}")
44
45
# Send messages
46
send_email.send("user@example.com", "Hello", "Welcome!")
47
process_payment.send(123, 99.99)
48
```
49
50
### Actor Class
51
52
The core actor implementation that wraps functions for asynchronous execution.
53
54
```python { .api }
55
class Actor:
56
def __init__(self, fn, *, broker, actor_name, queue_name, priority, options):
57
"""
58
Create an actor instance.
59
60
Parameters:
61
- fn: Callable function (supports sync/async)
62
- broker: Broker instance
63
- actor_name: str - Name of the actor
64
- queue_name: str - Queue name
65
- priority: int - Actor priority (lower = higher priority)
66
- options: Dict[str, Any] - Arbitrary options for broker/middleware
67
"""
68
69
def message(self, *args, **kwargs) -> Message:
70
"""
71
Build a message without sending it.
72
73
Returns:
74
Message object that can be sent later or used in composition
75
"""
76
77
def message_with_options(self, *, args=(), kwargs=None, **options) -> Message:
78
"""
79
Build message with custom options.
80
81
Parameters:
82
- args: tuple - Arguments for the actor
83
- kwargs: dict - Keyword arguments for the actor
84
- **options: Additional message options (delay, etc.)
85
86
Returns:
87
Message object with specified options
88
"""
89
90
def send(self, *args, **kwargs) -> Message:
91
"""
92
Send message asynchronously to broker.
93
94
Returns:
95
Message object representing the enqueued task
96
"""
97
98
def send_with_options(self, *, args=(), kwargs=None, delay=None, **options) -> Message:
99
"""
100
Send message with custom options.
101
102
Parameters:
103
- args: tuple - Arguments for the actor
104
- kwargs: dict - Keyword arguments for the actor
105
- delay: int - Delay in milliseconds before processing
106
- **options: Additional message options
107
108
Returns:
109
Message object representing the enqueued task
110
"""
111
112
def __call__(self, *args, **kwargs):
113
"""
114
Execute the actor synchronously (for testing/development).
115
116
Returns:
117
Result of the wrapped function
118
"""
119
120
# Properties
121
logger: Logger # Actor's logger instance
122
fn: Callable # Underlying callable function
123
broker: Broker # Associated broker
124
actor_name: str # Actor name
125
queue_name: str # Queue name
126
priority: int # Priority level
127
options: Dict[str, Any] # Actor options
128
```
129
130
**Usage:**
131
132
```python
133
# Create actor manually
134
def my_function(x, y):
135
return x + y
136
137
my_actor = dramatiq.Actor(
138
my_function,
139
broker=dramatiq.get_broker(),
140
actor_name="adder",
141
queue_name="math",
142
priority=5,
143
options={"max_retries": 3}
144
)
145
146
# Use the actor
147
result_msg = my_actor.send(10, 20)
148
direct_result = my_actor(10, 20) # Synchronous execution
149
```
150
151
### Generic Actor Class
152
153
Base class for creating class-based actors with metaclass support and configuration via Meta class.
154
155
```python { .api }
156
class GenericActor:
157
"""
158
Base class for class-based actors.
159
160
Subclasses must implement the perform() method and can define
161
configuration through a Meta inner class.
162
"""
163
164
class Meta:
165
# Configuration options (all optional)
166
actor_name: str = None
167
queue_name: str = "default"
168
priority: int = 0
169
broker: Broker = None
170
# Any additional options...
171
172
def perform(self, *args, **kwargs):
173
"""
174
Abstract method that subclasses must implement.
175
This method contains the actual task logic.
176
177
Parameters:
178
- *args: Variable positional arguments
179
- **kwargs: Variable keyword arguments
180
181
Returns:
182
Task result (any type)
183
"""
184
raise NotImplementedError
185
```
186
187
**Usage:**
188
189
```python
190
class EmailActor(dramatiq.GenericActor):
191
class Meta:
192
queue_name = "emails"
193
priority = 10
194
max_retries = 5
195
196
def perform(self, to, subject, body):
197
# Email sending logic
198
print(f"Sending email to {to}: {subject}")
199
# ... actual email sending ...
200
return f"Email sent to {to}"
201
202
class PaymentActor(dramatiq.GenericActor):
203
class Meta:
204
queue_name = "payments"
205
priority = 1 # Higher priority
206
time_limit = 30000 # 30 seconds
207
208
def perform(self, user_id, amount, payment_method):
209
# Payment processing logic
210
print(f"Processing ${amount} payment for user {user_id}")
211
# ... payment processing ...
212
return {"status": "success", "transaction_id": "12345"}
213
214
# Send messages to class-based actors
215
EmailActor.send("user@example.com", "Welcome", "Thanks for signing up!")
216
PaymentActor.send(123, 99.99, "credit_card")
217
```
218
219
### Actor Options
220
221
Actors support various options that control their behavior and integration with middleware:
222
223
```python { .api }
224
# Common actor options
225
ACTOR_OPTIONS = {
226
# Retry configuration
227
"max_retries": int, # Maximum retry attempts (default: 20)
228
"min_backoff": int, # Minimum backoff in ms (default: 15000)
229
"max_backoff": int, # Maximum backoff in ms (default: 604800000)
230
"retry_when": Callable, # Function to determine if retry should occur
231
232
# Time limits
233
"time_limit": int, # Maximum execution time in ms (default: 600000)
234
235
# Age limits
236
"max_age": int, # Maximum message age in ms before rejection
237
238
# Results storage
239
"store_results": bool, # Whether to store results (default: False)
240
241
# Callbacks
242
"on_success": str, # Actor name to call on success
243
"on_failure": str, # Actor name to call on failure
244
245
# Custom options for specific middleware/brokers
246
# (any additional key-value pairs)
247
}
248
```
249
250
**Usage:**
251
252
```python
253
@dramatiq.actor(
254
queue_name="critical",
255
priority=1,
256
max_retries=3,
257
time_limit=60000, # 1 minute
258
store_results=True,
259
on_success="log_success",
260
on_failure="handle_failure"
261
)
262
def critical_task(data):
263
# Critical processing logic
264
return process_critical_data(data)
265
266
@dramatiq.actor
267
def log_success(message_data, result):
268
print(f"Task {message_data.message_id} succeeded: {result}")
269
270
@dramatiq.actor
271
def handle_failure(message_data, exception_data):
272
print(f"Task {message_data.message_id} failed: {exception_data}")
273
```
274
275
### Advanced Actor Patterns
276
277
#### Conditional Retries
278
279
```python
280
def should_retry(retries_so_far, exception):
281
"""Custom retry logic"""
282
if isinstance(exception, TemporaryError):
283
return retries_so_far < 5
284
return False
285
286
@dramatiq.actor(retry_when=should_retry)
287
def smart_retry_task(data):
288
# Task that uses custom retry logic
289
if random.random() < 0.3:
290
raise TemporaryError("Temporary failure")
291
return "Success"
292
```
293
294
#### Dynamic Actor Creation
295
296
```python
297
def create_actor(name, queue, priority=0):
298
"""Factory function for creating actors dynamically"""
299
300
@dramatiq.actor(
301
actor_name=name,
302
queue_name=queue,
303
priority=priority
304
)
305
def dynamic_task(*args, **kwargs):
306
print(f"Actor {name} processing: {args}, {kwargs}")
307
return f"Processed by {name}"
308
309
return dynamic_task
310
311
# Create actors dynamically
312
email_actor = create_actor("email_sender", "emails", priority=5)
313
sms_actor = create_actor("sms_sender", "sms", priority=3)
314
315
email_actor.send("user@example.com", "Hello")
316
sms_actor.send("+1234567890", "Hello")
317
```
318
319
#### Actor Composition with Message Building
320
321
```python
322
@dramatiq.actor
323
def step_one(data):
324
return {"processed": data, "step": 1}
325
326
@dramatiq.actor
327
def step_two(data):
328
return {"processed": data, "step": 2}
329
330
@dramatiq.actor
331
def final_step(data):
332
return {"final": data}
333
334
# Build messages for composition
335
msg1 = step_one.message({"input": "data"})
336
msg2 = step_two.message({"from_step_one": True})
337
msg3 = final_step.message({"complete": True})
338
339
# Create pipeline
340
pipeline = msg1 | msg2 | msg3
341
pipeline.run()
342
```
343
344
## Queue Name Validation
345
346
Queue names must follow specific naming rules:
347
348
```python { .api }
349
# Valid queue name pattern
350
QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"
351
352
# Valid examples:
353
"default" # ✓
354
"high_priority" # ✓
355
"user.emails" # ✓
356
"queue-1" # ✓
357
"_internal" # ✓
358
359
# Invalid examples:
360
"123-queue" # ✗ (starts with number)
361
"my queue" # ✗ (contains space)
362
"queue@domain" # ✗ (contains @)
363
```