0
# Messages
1
2
Messages in Dramatiq represent tasks to be executed by workers. They contain all the information needed to invoke an actor, including arguments, metadata, and processing options. The message system also handles serialization, routing, and integration with composition features.
3
4
## Capabilities
5
6
### Message Class
7
8
The core message data structure that represents a task.
9
10
```python { .api }
11
class Message:
12
def __init__(
13
self,
14
queue_name: str,
15
actor_name: str,
16
args: tuple,
17
kwargs: Dict[str, Any],
18
options: Dict[str, Any],
19
message_id: str = None,
20
message_timestamp: int = None
21
):
22
"""
23
Create a message instance.
24
25
Parameters:
26
- queue_name: Name of the queue for this message
27
- actor_name: Name of the actor to execute
28
- args: Positional arguments for the actor
29
- kwargs: Keyword arguments for the actor
30
- options: Message options and metadata
31
- message_id: Unique message identifier (auto-generated if None)
32
- message_timestamp: Unix timestamp in milliseconds (auto-generated if None)
33
"""
34
35
def encode(self) -> bytes:
36
"""
37
Encode message to bytes for transmission.
38
39
Returns:
40
Serialized message data
41
"""
42
43
@classmethod
44
def decode(cls, data: bytes) -> 'Message':
45
"""
46
Decode message from bytes.
47
48
Parameters:
49
- data: Serialized message data
50
51
Returns:
52
Message instance
53
54
Raises:
55
DecodeError: If data cannot be decoded
56
"""
57
58
def copy(self, **attributes) -> 'Message':
59
"""
60
Create a copy of the message with modified attributes.
61
62
Parameters:
63
- **attributes: Attributes to modify in the copy
64
65
Returns:
66
New message instance with modifications
67
"""
68
69
def get_result(
70
self,
71
*,
72
backend: ResultBackend = None,
73
block: bool = False,
74
timeout: int = None
75
):
76
"""
77
Get result for this message (requires Results middleware).
78
79
Parameters:
80
- backend: Result backend to use (uses broker's backend if None)
81
- block: Whether to block waiting for result
82
- timeout: Timeout in milliseconds when blocking
83
84
Returns:
85
Task result
86
87
Raises:
88
ResultMissing: If result not available
89
ResultTimeout: If timeout exceeded while blocking
90
ResultFailure: If task failed with exception
91
"""
92
93
def asdict(self) -> Dict[str, Any]:
94
"""
95
Convert message to dictionary representation.
96
97
Returns:
98
Dictionary containing message data
99
"""
100
101
def __or__(self, other) -> 'pipeline':
102
"""
103
Create pipeline with this message using | operator.
104
105
Parameters:
106
- other: Message or pipeline to chain with
107
108
Returns:
109
Pipeline containing both messages
110
"""
111
112
# Properties
113
queue_name: str # Queue name
114
actor_name: str # Actor name
115
args: tuple # Positional arguments
116
kwargs: Dict[str, Any] # Keyword arguments
117
options: Dict[str, Any] # Message options
118
message_id: str # Unique identifier
119
message_timestamp: int # Creation timestamp
120
```
121
122
**Usage:**
123
124
```python
125
import dramatiq
126
127
@dramatiq.actor
128
def example_task(x, y, multiplier=1):
129
return (x + y) * multiplier
130
131
# Create message manually
132
message = dramatiq.Message(
133
queue_name="default",
134
actor_name="example_task",
135
args=(10, 20),
136
kwargs={"multiplier": 2},
137
options={"max_retries": 5}
138
)
139
140
# Send message
141
broker = dramatiq.get_broker()
142
broker.enqueue(message)
143
144
# Message created by actor
145
auto_message = example_task.message(10, 20, multiplier=2)
146
print(f"Message ID: {auto_message.message_id}")
147
print(f"Message data: {auto_message.asdict()}")
148
149
# Send the message
150
auto_message = example_task.send(10, 20, multiplier=2)
151
```
152
153
### Message Encoding
154
155
The encoding system handles serialization and deserialization of messages for transmission between brokers and workers.
156
157
#### Encoder Base Class
158
159
```python { .api }
160
class Encoder:
161
"""
162
Abstract base class for message encoders.
163
164
Encoders handle serialization of message data for storage
165
and transmission through brokers.
166
"""
167
168
def encode(self, data: MessageData) -> bytes:
169
"""
170
Encode message data to bytes.
171
172
Parameters:
173
- data: Message data to encode
174
175
Returns:
176
Encoded message as bytes
177
"""
178
raise NotImplementedError
179
180
def decode(self, data: bytes) -> MessageData:
181
"""
182
Decode bytes to message data.
183
184
Parameters:
185
- data: Encoded message bytes
186
187
Returns:
188
Decoded message data
189
190
Raises:
191
DecodeError: If data cannot be decoded
192
"""
193
raise NotImplementedError
194
```
195
196
#### JSON Encoder
197
198
Default encoder using JSON serialization.
199
200
```python { .api }
201
class JSONEncoder(Encoder):
202
def __init__(self):
203
"""
204
Create JSON encoder for message serialization.
205
206
Uses JSON for cross-language compatibility and human readability.
207
Handles standard Python types: str, int, float, bool, list, dict, None.
208
"""
209
210
def encode(self, data: MessageData) -> bytes:
211
"""
212
Encode message data to JSON bytes.
213
214
Returns:
215
UTF-8 encoded JSON data
216
"""
217
218
def decode(self, data: bytes) -> MessageData:
219
"""
220
Decode JSON bytes to message data.
221
222
Returns:
223
Decoded Python objects
224
"""
225
```
226
227
**Usage:**
228
229
```python
230
from dramatiq import JSONEncoder
231
232
# JSON encoder (default)
233
json_encoder = JSONEncoder()
234
235
@dramatiq.actor
236
def json_compatible_task(data):
237
# Data must be JSON-serializable
238
return {
239
"processed": data["input"],
240
"timestamp": time.time(),
241
"items": [1, 2, 3, 4, 5]
242
}
243
244
# Send JSON-compatible data
245
json_compatible_task.send({
246
"input": "test data",
247
"config": {"option1": True, "option2": "value"}
248
})
249
```
250
251
#### Pickle Encoder
252
253
Encoder using Python's pickle for complex object serialization.
254
255
```python { .api }
256
class PickleEncoder(Encoder):
257
def __init__(self):
258
"""
259
Create Pickle encoder for Python object serialization.
260
261
WARNING: Pickle can execute arbitrary code during deserialization.
262
Only use with trusted data sources.
263
"""
264
265
def encode(self, data: MessageData) -> bytes:
266
"""
267
Encode message data using pickle.
268
269
Returns:
270
Pickled data bytes
271
"""
272
273
def decode(self, data: bytes) -> MessageData:
274
"""
275
Decode pickled bytes to message data.
276
277
Returns:
278
Unpickled Python objects
279
"""
280
```
281
282
**Usage:**
283
284
```python
285
from dramatiq import PickleEncoder, set_encoder
286
import datetime
287
288
# Set pickle encoder globally (use with caution)
289
pickle_encoder = PickleEncoder()
290
dramatiq.set_encoder(pickle_encoder)
291
292
class CustomObject:
293
def __init__(self, value):
294
self.value = value
295
self.created = datetime.datetime.now()
296
297
@dramatiq.actor
298
def pickle_compatible_task(obj):
299
# Can handle complex Python objects
300
return {
301
"object_value": obj.value,
302
"created": obj.created,
303
"processed": datetime.datetime.now()
304
}
305
306
# Send complex objects
307
custom_obj = CustomObject("test value")
308
pickle_compatible_task.send(custom_obj)
309
```
310
311
### Global Encoder Management
312
313
Functions for managing the global message encoder.
314
315
```python { .api }
316
def get_encoder() -> Encoder:
317
"""
318
Get the current global message encoder.
319
320
Returns:
321
Current encoder instance (defaults to JSONEncoder)
322
"""
323
324
def set_encoder(encoder: Encoder):
325
"""
326
Set the global message encoder.
327
328
Parameters:
329
- encoder: Encoder instance to use globally
330
"""
331
```
332
333
**Usage:**
334
335
```python
336
# Check current encoder
337
current_encoder = dramatiq.get_encoder()
338
print(f"Current encoder: {type(current_encoder).__name__}")
339
340
# Switch to custom encoder
341
custom_encoder = MyCustomEncoder()
342
dramatiq.set_encoder(custom_encoder)
343
344
# All messages will now use the custom encoder
345
@dramatiq.actor
346
def task_with_custom_encoding(data):
347
return process_with_custom_format(data)
348
```
349
350
### Advanced Message Patterns
351
352
#### Message Metadata and Options
353
354
```python
355
@dramatiq.actor
356
def metadata_aware_task(data):
357
# Access current message in middleware
358
from dramatiq.middleware import get_current_message
359
360
try:
361
current_msg = get_current_message()
362
363
return {
364
"data": data,
365
"message_id": current_msg.message_id,
366
"retry_count": current_msg.options.get("retries", 0),
367
"queue": current_msg.queue_name,
368
"created_at": current_msg.message_timestamp
369
}
370
except:
371
# Fallback when no current message available
372
return {"data": data, "no_metadata": True}
373
374
# Send with custom options
375
message = metadata_aware_task.message_with_options(
376
args=("test_data",),
377
delay=5000, # 5 second delay
378
max_retries=3,
379
custom_option="custom_value"
380
)
381
message_id = broker.enqueue(message).message_id
382
```
383
384
#### Message Copying and Modification
385
386
```python
387
@dramatiq.actor
388
def original_task(data, multiplier=1):
389
return data * multiplier
390
391
# Create base message
392
base_message = original_task.message("hello", multiplier=2)
393
394
# Create variations
395
urgent_message = base_message.copy(
396
queue_name="urgent",
397
options={**base_message.options, "priority": 0}
398
)
399
400
delayed_message = base_message.copy(
401
args=("goodbye",),
402
options={**base_message.options, "delay": 30000}
403
)
404
405
# Send variations
406
broker.enqueue(urgent_message)
407
broker.enqueue(delayed_message)
408
```
409
410
#### Conditional Message Creation
411
412
```python
413
def create_conditional_message(data, priority="normal"):
414
"""Create message based on priority level"""
415
416
if priority == "urgent":
417
return urgent_task.message_with_options(
418
args=(data,),
419
queue_name="urgent",
420
priority=0,
421
max_retries=1
422
)
423
elif priority == "high":
424
return high_priority_task.message_with_options(
425
args=(data,),
426
queue_name="high_priority",
427
priority=1,
428
max_retries=3
429
)
430
else:
431
return normal_task.message_with_options(
432
args=(data,),
433
queue_name="normal",
434
priority=5,
435
max_retries=5
436
)
437
438
# Usage
439
urgent_msg = create_conditional_message("critical_data", "urgent")
440
normal_msg = create_conditional_message("regular_data", "normal")
441
442
broker.enqueue(urgent_msg)
443
broker.enqueue(normal_msg)
444
```
445
446
### Message Lifecycle and Tracking
447
448
#### Message State Tracking
449
450
```python
451
import time
452
from enum import Enum
453
454
class MessageState(Enum):
455
CREATED = "created"
456
ENQUEUED = "enqueued"
457
PROCESSING = "processing"
458
COMPLETED = "completed"
459
FAILED = "failed"
460
RETRYING = "retrying"
461
462
class TrackedMessage:
463
def __init__(self, message):
464
self.message = message
465
self.state = MessageState.CREATED
466
self.state_history = [(MessageState.CREATED, time.time())]
467
self.result = None
468
self.error = None
469
470
def update_state(self, new_state, error=None):
471
self.state = new_state
472
self.state_history.append((new_state, time.time()))
473
if error:
474
self.error = error
475
476
def get_duration(self):
477
if len(self.state_history) < 2:
478
return 0
479
start_time = self.state_history[0][1]
480
end_time = self.state_history[-1][1]
481
return end_time - start_time
482
483
# Middleware for message tracking
484
class MessageTrackingMiddleware(dramatiq.Middleware):
485
def __init__(self):
486
self.tracked_messages = {}
487
488
def before_enqueue(self, broker, message, delay):
489
tracked = TrackedMessage(message)
490
tracked.update_state(MessageState.ENQUEUED)
491
self.tracked_messages[message.message_id] = tracked
492
493
def before_process_message(self, broker, message):
494
if message.message_id in self.tracked_messages:
495
tracked = self.tracked_messages[message.message_id]
496
tracked.update_state(MessageState.PROCESSING)
497
498
def after_process_message(self, broker, message, *, result=None, exception=None):
499
if message.message_id in self.tracked_messages:
500
tracked = self.tracked_messages[message.message_id]
501
if exception:
502
tracked.update_state(MessageState.FAILED, error=str(exception))
503
else:
504
tracked.update_state(MessageState.COMPLETED)
505
tracked.result = result
506
507
# Usage
508
tracking_middleware = MessageTrackingMiddleware()
509
broker.add_middleware(tracking_middleware)
510
511
@dramatiq.actor
512
def tracked_task(data):
513
time.sleep(1) # Simulate work
514
return f"Processed: {data}"
515
516
# Send tracked message
517
message = tracked_task.send("test_data")
518
519
# Check tracking later
520
time.sleep(2)
521
tracked = tracking_middleware.tracked_messages.get(message.message_id)
522
if tracked:
523
print(f"Message state: {tracked.state}")
524
print(f"Duration: {tracked.get_duration():.2f}s")
525
```
526
527
#### Message Batching
528
529
```python
530
class MessageBatch:
531
def __init__(self, broker):
532
self.broker = broker
533
self.messages = []
534
535
def add(self, message):
536
"""Add message to batch"""
537
self.messages.append(message)
538
539
def send_all(self, delay=None):
540
"""Send all messages in the batch"""
541
sent_messages = []
542
543
for message in self.messages:
544
if delay:
545
message = message.copy(options={**message.options, "delay": delay})
546
547
sent_message = self.broker.enqueue(message)
548
sent_messages.append(sent_message)
549
550
self.messages.clear()
551
return sent_messages
552
553
def __len__(self):
554
return len(self.messages)
555
556
# Usage
557
batch = MessageBatch(broker)
558
559
# Add messages to batch
560
for i in range(10):
561
msg = process_item.message(f"item_{i}", {"config": "batch_mode"})
562
batch.add(msg)
563
564
# Send entire batch with delay
565
sent_messages = batch.send_all(delay=1000) # 1 second delay
566
print(f"Sent {len(sent_messages)} messages")
567
```
568
569
### Custom Message Serialization
570
571
#### Custom Encoder Implementation
572
573
```python
574
import msgpack
575
from dramatiq import Encoder, DecodeError
576
577
class MessagePackEncoder(Encoder):
578
"""Custom encoder using MessagePack for efficient serialization"""
579
580
def encode(self, data):
581
try:
582
return msgpack.packb(data, use_bin_type=True)
583
except Exception as e:
584
raise DecodeError(f"Failed to encode with MessagePack: {e}")
585
586
def decode(self, data):
587
try:
588
return msgpack.unpackb(data, raw=False, strict_map_key=False)
589
except Exception as e:
590
raise DecodeError(f"Failed to decode with MessagePack: {e}")
591
592
# Use custom encoder
593
msgpack_encoder = MessagePackEncoder()
594
dramatiq.set_encoder(msgpack_encoder)
595
596
@dramatiq.actor
597
def msgpack_task(data):
598
"""Task using MessagePack encoding"""
599
return {
600
"input_size": len(str(data)),
601
"processed": True,
602
"binary_data": b"binary content"
603
}
604
605
# Send binary-friendly data
606
msgpack_task.send({
607
"text": "Hello World",
608
"binary": b"\\x00\\x01\\x02\\x03",
609
"numbers": [1, 2, 3, 4, 5]
610
})
611
```
612
613
#### Compression Support
614
615
```python
616
import gzip
617
import json
618
619
class CompressedJSONEncoder(dramatiq.Encoder):
620
"""JSON encoder with gzip compression"""
621
622
def __init__(self, compression_level=6):
623
self.compression_level = compression_level
624
625
def encode(self, data):
626
try:
627
json_data = json.dumps(data).encode('utf-8')
628
return gzip.compress(json_data, compresslevel=self.compression_level)
629
except Exception as e:
630
raise dramatiq.DecodeError(f"Failed to encode/compress: {e}")
631
632
def decode(self, data):
633
try:
634
decompressed = gzip.decompress(data)
635
return json.loads(decompressed.decode('utf-8'))
636
except Exception as e:
637
raise dramatiq.DecodeError(f"Failed to decompress/decode: {e}")
638
639
# Use compressed encoder for large messages
640
compressed_encoder = CompressedJSONEncoder(compression_level=9)
641
dramatiq.set_encoder(compressed_encoder)
642
643
@dramatiq.actor
644
def large_data_task(large_data):
645
"""Task handling large data with compression"""
646
return {
647
"processed_items": len(large_data),
648
"sample": large_data[:10] if large_data else [],
649
"compression_effective": True
650
}
651
652
# Send large dataset
653
large_dataset = list(range(10000))
654
large_data_task.send(large_dataset)
655
```
656
657
### Message Debugging and Inspection
658
659
#### Message Inspector
660
661
```python
662
import pprint
663
664
class MessageInspector:
665
def __init__(self, broker):
666
self.broker = broker
667
668
def inspect_message(self, message):
669
"""Detailed message inspection"""
670
print(f"=== Message Inspection ===")
671
print(f"ID: {message.message_id}")
672
print(f"Actor: {message.actor_name}")
673
print(f"Queue: {message.queue_name}")
674
print(f"Timestamp: {message.message_timestamp}")
675
print(f"Args: {message.args}")
676
print(f"Kwargs: {message.kwargs}")
677
print(f"Options:")
678
pprint.pprint(message.options, indent=2)
679
680
# Size information
681
encoded = message.encode()
682
print(f"Encoded size: {len(encoded)} bytes")
683
684
# Validate encoding/decoding
685
try:
686
decoded = dramatiq.Message.decode(encoded)
687
print("✓ Encoding/decoding successful")
688
except Exception as e:
689
print(f"✗ Encoding/decoding failed: {e}")
690
691
def compare_messages(self, msg1, msg2):
692
"""Compare two messages"""
693
print(f"=== Message Comparison ===")
694
695
fields = ['message_id', 'actor_name', 'queue_name', 'args', 'kwargs', 'options']
696
for field in fields:
697
val1 = getattr(msg1, field)
698
val2 = getattr(msg2, field)
699
700
if val1 == val2:
701
print(f"✓ {field}: MATCH")
702
else:
703
print(f"✗ {field}: DIFFER")
704
print(f" Message 1: {val1}")
705
print(f" Message 2: {val2}")
706
707
# Usage
708
inspector = MessageInspector(broker)
709
710
@dramatiq.actor
711
def debug_task(data, option=None):
712
return f"Debug: {data} with {option}"
713
714
# Create and inspect message
715
message = debug_task.message("test_data", option="debug_mode")
716
inspector.inspect_message(message)
717
718
# Create modified copy and compare
719
modified = message.copy(queue_name="debug", options={"debug": True})
720
inspector.compare_messages(message, modified)
721
```
722
723
#### Message Queue Monitoring
724
725
```python
726
def monitor_message_queues(broker, interval=5):
727
"""Monitor message queues for debugging"""
728
729
def queue_stats():
730
while True:
731
try:
732
# Get queue information (broker-specific)
733
if hasattr(broker, 'client') and hasattr(broker.client, 'info'):
734
# Redis broker
735
for queue_name in broker.queues:
736
key = f"{broker.namespace}:{queue_name}.msgs"
737
length = broker.client.llen(key)
738
print(f"Queue '{queue_name}': {length} messages")
739
740
print("---")
741
time.sleep(interval)
742
743
except Exception as e:
744
print(f"Error monitoring queues: {e}")
745
time.sleep(interval)
746
747
monitor_thread = threading.Thread(target=queue_stats, daemon=True)
748
monitor_thread.start()
749
return monitor_thread
750
751
# Start queue monitoring
752
monitor_thread = monitor_message_queues(broker)
753
754
# Send some messages to observe
755
for i in range(20):
756
debug_task.send(f"message_{i}")
757
```
758
759
This comprehensive message documentation covers all aspects of working with messages in Dramatiq, from basic usage to advanced patterns for tracking, batching, custom serialization, and debugging.