0
# Messaging
1
2
High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support. The messaging API provides the primary interface for sending and receiving messages in Kombu applications.
3
4
## Capabilities
5
6
### Producer
7
8
Message producer for publishing messages to exchanges with serialization, compression, and delivery options.
9
10
```python { .api }
11
class Producer:
12
def __init__(self, channel, exchange=None, routing_key='', serializer=None, compression=None, auto_declare=True, on_return=None, **kwargs):
13
"""
14
Create message producer.
15
16
Parameters:
17
- channel: AMQP channel to use
18
- exchange (Exchange): Default exchange for publishing
19
- routing_key (str): Default routing key
20
- serializer (str): Default serialization method
21
- compression (str): Default compression method
22
- auto_declare (bool): Automatically declare entities
23
- on_return (callable): Callback for returned messages
24
"""
25
26
def declare(self):
27
"""
28
Declare the default exchange and any entities in auto_declare list.
29
30
Returns:
31
Producer instance for chaining
32
"""
33
34
def maybe_declare(self, entity, retry=False, **retry_policy):
35
"""
36
Declare entity if not already declared (cached).
37
38
Parameters:
39
- entity (Exchange|Queue): Entity to declare
40
- retry (bool): Enable retry on failure
41
- retry_policy: Retry policy parameters
42
43
Returns:
44
bool: True if entity was declared
45
"""
46
47
def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, confirm_timeout=None, **properties):
48
"""
49
Publish message to exchange.
50
51
Parameters:
52
- body: Message body (will be serialized)
53
- routing_key (str): Message routing key
54
- delivery_mode (int): Delivery mode (1=transient, 2=persistent)
55
- mandatory (bool): Return message if no route found
56
- immediate (bool): Return message if no consumer ready
57
- priority (int): Message priority (0-255)
58
- content_type (str): Content type override
59
- content_encoding (str): Content encoding override
60
- serializer (str): Serializer override
61
- headers (dict): Message headers
62
- compression (str): Compression method override
63
- exchange (Exchange): Exchange override
64
- retry (bool): Enable retry on failure
65
- retry_policy (dict): Retry policy parameters
66
- declare (list): Entities to declare before publishing
67
- expiration (str): Message expiration time
68
- timeout (float): Operation timeout in seconds
69
- confirm_timeout (float): Publisher confirmation timeout
70
- **properties: Additional message properties
71
72
Returns:
73
None
74
"""
75
76
def revive(self, channel):
77
"""
78
Revive producer after connection re-establishment.
79
80
Parameters:
81
- channel: New channel to use
82
83
Returns:
84
Producer instance for chaining
85
"""
86
87
def close(self):
88
"""Close producer and cleanup resources."""
89
90
def release(self):
91
"""Release producer resources (alias for close)."""
92
93
# Properties
94
@property
95
def channel(self):
96
"""Channel: AMQP channel"""
97
98
@property
99
def exchange(self):
100
"""Exchange: Default exchange"""
101
102
@property
103
def routing_key(self):
104
"""str: Default routing key"""
105
106
@property
107
def serializer(self):
108
"""str: Default serializer"""
109
110
@property
111
def compression(self):
112
"""str: Default compression method"""
113
114
@property
115
def auto_declare(self):
116
"""bool: Auto-declare flag"""
117
118
@property
119
def on_return(self):
120
"""callable: Basic return callback"""
121
```
122
123
### Consumer
124
125
Message consumer for receiving messages from queues with callback handling, acknowledgment control, and quality of service management.
126
127
```python { .api }
128
class Consumer:
129
def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None, **kwargs):
130
"""
131
Create message consumer.
132
133
Parameters:
134
- channel: AMQP channel to use
135
- queues (list): Queues to consume from
136
- no_ack (bool): Disable message acknowledgments
137
- auto_declare (bool): Automatically declare entities
138
- callbacks (list): Message callback functions
139
- on_decode_error (callable): Decode error callback
140
- on_message (callable): Alternative message handler
141
- accept (list): Accepted content types
142
- prefetch_count (int): QoS prefetch count
143
- tag_prefix (str): Consumer tag prefix
144
"""
145
146
def revive(self, channel):
147
"""
148
Revive consumer after connection re-establishment.
149
150
Parameters:
151
- channel: New channel to use
152
153
Returns:
154
Consumer instance for chaining
155
"""
156
157
def declare(self):
158
"""
159
Declare queues, exchanges and bindings.
160
161
Returns:
162
Consumer instance for chaining
163
"""
164
165
def register_callback(self, callback):
166
"""
167
Register new callback function.
168
169
Parameters:
170
- callback (callable): Function to call for each message
171
172
Returns:
173
Consumer instance for chaining
174
"""
175
176
def add_queue(self, queue):
177
"""
178
Add queue to consume from.
179
180
Parameters:
181
- queue (Queue): Queue to add
182
183
Returns:
184
Consumer instance for chaining
185
"""
186
187
def consume(self, no_ack=None):
188
"""
189
Start consuming messages from queues.
190
191
Parameters:
192
- no_ack (bool): Disable acknowledgments override
193
194
Returns:
195
Consumer instance for chaining
196
"""
197
198
def cancel(self):
199
"""
200
End all active queue consumers.
201
202
Returns:
203
Consumer instance for chaining
204
"""
205
206
def cancel_by_queue(self, queue):
207
"""
208
Cancel consumer for specific queue.
209
210
Parameters:
211
- queue (str|Queue): Queue to stop consuming
212
213
Returns:
214
Consumer instance for chaining
215
"""
216
217
def consuming_from(self, queue):
218
"""
219
Check if currently consuming from queue.
220
221
Parameters:
222
- queue (str|Queue): Queue to check
223
224
Returns:
225
bool: True if consuming from queue
226
"""
227
228
def purge(self):
229
"""
230
Purge messages from all queues.
231
232
Returns:
233
int: Total number of messages purged
234
"""
235
236
def flow(self, active):
237
"""
238
Enable/disable flow from peer.
239
240
Parameters:
241
- active (bool): Enable or disable flow
242
243
Returns:
244
Consumer instance for chaining
245
"""
246
247
def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
248
"""
249
Set quality of service limits.
250
251
Parameters:
252
- prefetch_size (int): Prefetch window size
253
- prefetch_count (int): Prefetch message count
254
- apply_global (bool): Apply globally or per-consumer
255
256
Returns:
257
Consumer instance for chaining
258
"""
259
260
def recover(self, requeue=False):
261
"""
262
Redeliver unacknowledged messages.
263
264
Parameters:
265
- requeue (bool): Requeue messages to original position
266
267
Returns:
268
Consumer instance for chaining
269
"""
270
271
def receive(self, body, message):
272
"""
273
Handle received message by calling callbacks.
274
275
Parameters:
276
- body: Decoded message body
277
- message (Message): Message instance
278
279
Returns:
280
None
281
"""
282
283
# Properties
284
@property
285
def channel(self):
286
"""Channel: AMQP channel"""
287
288
@property
289
def queues(self):
290
"""list: Queues being consumed"""
291
292
@property
293
def no_ack(self):
294
"""bool: Automatic acknowledgment flag"""
295
296
@property
297
def auto_declare(self):
298
"""bool: Auto-declare entities flag"""
299
300
@property
301
def callbacks(self):
302
"""list: Message callback functions"""
303
304
@property
305
def on_message(self):
306
"""callable: Alternative message handler"""
307
308
@property
309
def on_decode_error(self):
310
"""callable: Decode error callback"""
311
312
@property
313
def accept(self):
314
"""list: Accepted content types"""
315
316
@property
317
def prefetch_count(self):
318
"""int: QoS prefetch count"""
319
```
320
321
### Message
322
323
Base class for received messages with acknowledgment, rejection, and decoding capabilities.
324
325
```python { .api }
326
class Message:
327
def __init__(self, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info=None, properties=None, headers=None, **kwargs):
328
"""
329
Create message instance.
330
331
Parameters:
332
- body: Raw message body
333
- delivery_tag: Unique delivery identifier
334
- content_type (str): Message content type
335
- content_encoding (str): Content encoding
336
- delivery_info (dict): Delivery information
337
- properties (dict): Message properties
338
- headers (dict): Message headers
339
"""
340
341
def ack(self, multiple=False):
342
"""
343
Acknowledge message processing.
344
345
Parameters:
346
- multiple (bool): Acknowledge all messages up to this one
347
348
Raises:
349
MessageStateError: If message already acknowledged
350
"""
351
352
def ack_log_error(self, logger, errors, multiple=False):
353
"""
354
Acknowledge message with error logging.
355
356
Parameters:
357
- logger: Logger instance
358
- errors (tuple): Error types to catch and log
359
- multiple (bool): Acknowledge multiple messages
360
361
Returns:
362
bool: True if acknowledgment succeeded
363
"""
364
365
def reject(self, requeue=False):
366
"""
367
Reject message.
368
369
Parameters:
370
- requeue (bool): Requeue message for redelivery
371
372
Raises:
373
MessageStateError: If message already acknowledged
374
"""
375
376
def reject_log_error(self, logger, errors, requeue=False):
377
"""
378
Reject message with error logging.
379
380
Parameters:
381
- logger: Logger instance
382
- errors (tuple): Error types to catch and log
383
- requeue (bool): Requeue message
384
385
Returns:
386
bool: True if rejection succeeded
387
"""
388
389
def requeue(self):
390
"""
391
Reject and requeue message (shortcut for reject(requeue=True)).
392
393
Raises:
394
MessageStateError: If message already acknowledged
395
"""
396
397
def decode(self):
398
"""
399
Deserialize message body (cached).
400
401
Returns:
402
Decoded message body
403
"""
404
405
def _decode(self):
406
"""
407
Force re-decode message body.
408
409
Returns:
410
Decoded message body
411
"""
412
413
# Properties
414
@property
415
def acknowledged(self):
416
"""bool: True if message has been acknowledged"""
417
418
@property
419
def payload(self):
420
"""Decoded message body (cached)"""
421
422
@property
423
def body(self):
424
"""Raw message body"""
425
426
@property
427
def content_type(self):
428
"""str: Message content type"""
429
430
@property
431
def content_encoding(self):
432
"""str: Message content encoding"""
433
434
@property
435
def delivery_info(self):
436
"""dict: Delivery information"""
437
438
@property
439
def headers(self):
440
"""dict: Message headers"""
441
442
@property
443
def properties(self):
444
"""dict: Message properties"""
445
```
446
447
## Usage Examples
448
449
### Basic Producer Usage
450
451
```python
452
from kombu import Connection, Exchange, Producer
453
454
# Define exchange
455
task_exchange = Exchange('tasks', type='direct', durable=True)
456
457
with Connection('redis://localhost:6379/0') as conn:
458
# Create producer
459
producer = Producer(
460
conn.channel(),
461
exchange=task_exchange,
462
routing_key='default',
463
serializer='json'
464
)
465
466
# Publish messages
467
producer.publish(
468
{'task': 'process_data', 'args': [1, 2, 3]},
469
routing_key='high_priority',
470
headers={'origin': 'web_app'},
471
priority=5
472
)
473
474
# Publish with different serializer
475
producer.publish(
476
b'binary data',
477
routing_key='binary_task',
478
serializer='pickle',
479
content_type='application/x-python-serialize'
480
)
481
```
482
483
### Basic Consumer Usage
484
485
```python
486
from kombu import Connection, Queue, Consumer
487
488
def process_message(body, message):
489
"""Message processing callback"""
490
try:
491
print(f"Processing: {body}")
492
# Simulate work
493
result = body['args'][0] + body['args'][1]
494
print(f"Result: {result}")
495
496
# Acknowledge successful processing
497
message.ack()
498
except Exception as exc:
499
print(f"Processing failed: {exc}")
500
# Reject and requeue for retry
501
message.reject(requeue=True)
502
503
# Define queue
504
task_queue = Queue('task_queue', durable=True)
505
506
with Connection('redis://localhost:6379/0') as conn:
507
# Create consumer
508
consumer = Consumer(
509
conn.channel(),
510
queues=[task_queue],
511
callbacks=[process_message],
512
prefetch_count=10
513
)
514
515
# Start consuming
516
consumer.consume()
517
518
# Process messages
519
while True:
520
try:
521
conn.drain_events(timeout=1.0)
522
except socket.timeout:
523
break
524
```
525
526
### Advanced Producer Features
527
528
```python
529
from kombu import Connection, Exchange, Queue, Producer
530
531
# Setup entities
532
exchange = Exchange('notifications', type='topic', durable=True)
533
queue = Queue('email_notifications', exchange, routing_key='email.*')
534
535
with Connection('amqp://localhost') as conn:
536
producer = Producer(conn.channel(), exchange=exchange)
537
538
# Publish with automatic declaration
539
producer.publish(
540
{
541
'to': 'user@example.com',
542
'subject': 'Welcome!',
543
'body': 'Welcome to our service'
544
},
545
routing_key='email.welcome',
546
declare=[queue], # Declare queue before publishing
547
mandatory=True, # Return if no route
548
expiration='300000', # 5 minute TTL
549
headers={'priority': 'high'}
550
)
551
552
# Publish compressed message
553
producer.publish(
554
{'large': 'data' * 1000},
555
routing_key='email.report',
556
compression='gzip',
557
serializer='pickle'
558
)
559
```
560
561
### Advanced Consumer Features
562
563
```python
564
from kombu import Connection, Queue, Consumer
565
import logging
566
567
logger = logging.getLogger(__name__)
568
569
def handle_decode_error(message, exc):
570
"""Handle message decode errors"""
571
logger.error(f"Failed to decode message: {exc}")
572
# Log the raw message for debugging
573
logger.error(f"Raw message body: {message.body}")
574
# Reject without requeue to avoid infinite loop
575
message.reject(requeue=False)
576
577
def process_message(body, message):
578
"""Process message with comprehensive error handling"""
579
try:
580
print(f"Processing message: {body}")
581
582
# Simulate processing that might fail
583
if body.get('fail'):
584
raise ValueError("Simulated processing error")
585
586
# Acknowledge successful processing
587
message.ack_log_error(logger, (Exception,))
588
589
except ValueError as exc:
590
logger.error(f"Processing error: {exc}")
591
# Reject and requeue for retry
592
message.reject_log_error(logger, (Exception,), requeue=True)
593
594
# Setup queues with different priorities
595
high_priority_queue = Queue('high_priority', routing_key='high')
596
low_priority_queue = Queue('low_priority', routing_key='low')
597
598
with Connection('redis://localhost:6379/0') as conn:
599
consumer = Consumer(
600
conn.channel(),
601
queues=[high_priority_queue, low_priority_queue],
602
callbacks=[process_message],
603
on_decode_error=handle_decode_error,
604
accept=['json', 'pickle'], # Only accept these content types
605
prefetch_count=5
606
)
607
608
# Set QoS limits
609
consumer.qos(prefetch_count=10, apply_global=True)
610
611
# Start consuming
612
consumer.consume()
613
614
# Process with graceful shutdown
615
try:
616
while True:
617
conn.drain_events(timeout=1.0)
618
except KeyboardInterrupt:
619
print("Shutting down...")
620
consumer.cancel()
621
```
622
623
### Message Inspection and Handling
624
625
```python
626
from kombu import Connection, Queue, Consumer
627
628
def inspect_message(body, message):
629
"""Inspect message properties and handle accordingly"""
630
631
# Check message properties
632
print(f"Content type: {message.content_type}")
633
print(f"Delivery info: {message.delivery_info}")
634
print(f"Headers: {message.headers}")
635
print(f"Properties: {message.properties}")
636
637
# Handle based on message properties
638
if message.headers and message.headers.get('priority') == 'urgent':
639
print("Processing urgent message immediately")
640
# Process immediately
641
process_urgent(body)
642
message.ack()
643
elif message.properties.get('redelivered'):
644
print("Message was redelivered - handling carefully")
645
# Special handling for redelivered messages
646
if handle_redelivered(body):
647
message.ack()
648
else:
649
# Dead letter or discard
650
message.reject(requeue=False)
651
else:
652
# Normal processing
653
if process_normal(body):
654
message.ack()
655
else:
656
message.requeue()
657
658
def process_urgent(body):
659
# Urgent processing logic
660
return True
661
662
def handle_redelivered(body):
663
# Redelivered message logic
664
return True
665
666
def process_normal(body):
667
# Normal processing logic
668
return True
669
670
queue = Queue('inspection_queue')
671
672
with Connection('redis://localhost:6379/0') as conn:
673
consumer = Consumer(conn.channel(), [queue], callbacks=[inspect_message])
674
consumer.consume()
675
676
# Process messages
677
conn.drain_events()
678
```
679
680
### Producer with Return Handling
681
682
```python
683
from kombu import Connection, Exchange, Producer
684
685
def handle_returned_message(exception, exchange, routing_key, message):
686
"""Handle messages returned by broker"""
687
print(f"Message returned: {exception}")
688
print(f"Exchange: {exchange}, Routing key: {routing_key}")
689
print(f"Message: {message}")
690
691
# Could implement retry logic, logging, etc.
692
693
exchange = Exchange('optional_routing', type='direct')
694
695
with Connection('amqp://localhost') as conn:
696
producer = Producer(
697
conn.channel(),
698
exchange=exchange,
699
on_return=handle_returned_message
700
)
701
702
# Publish with mandatory flag - will be returned if no route exists
703
producer.publish(
704
{'data': 'test'},
705
routing_key='nonexistent_route',
706
mandatory=True # Return message if no queue bound
707
)
708
```