0
# Message Entities
1
2
AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior. These entities provide the foundation for message routing, persistence, and delivery guarantees in messaging systems.
3
4
## Capabilities
5
6
### Exchange
7
8
Represents AMQP exchange declarations that route messages to queues based on routing rules and exchange types.
9
10
```python { .api }
11
class Exchange:
12
def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, delivery_mode=None, arguments=None, **kwargs):
13
"""
14
Create exchange declaration.
15
16
Parameters:
17
- name (str): Exchange name
18
- type (str): Exchange type ('direct', 'topic', 'fanout', 'headers')
19
- channel: AMQP channel to bind to
20
- durable (bool): Survive broker restart
21
- auto_delete (bool): Delete when no queues bound
22
- delivery_mode (int): Default delivery mode (1=transient, 2=persistent)
23
- arguments (dict): Additional exchange arguments
24
"""
25
26
def declare(self, nowait=False, passive=None, channel=None):
27
"""
28
Declare exchange on the broker.
29
30
Parameters:
31
- nowait (bool): Don't wait for confirmation
32
- passive (bool): Only check if exchange exists
33
- channel: Channel to use (uses bound channel if None)
34
35
Returns:
36
Exchange instance for chaining
37
"""
38
39
def bind_to(self, exchange, routing_key='', arguments=None, nowait=False, channel=None):
40
"""
41
Bind this exchange to another exchange.
42
43
Parameters:
44
- exchange (Exchange|str): Source exchange
45
- routing_key (str): Binding routing key
46
- arguments (dict): Binding arguments
47
- nowait (bool): Don't wait for confirmation
48
- channel: Channel to use
49
50
Returns:
51
Exchange instance for chaining
52
"""
53
54
def unbind_from(self, source, routing_key='', arguments=None, nowait=False, channel=None):
55
"""
56
Unbind exchange from source exchange.
57
58
Parameters:
59
- source (Exchange|str): Source exchange to unbind from
60
- routing_key (str): Binding routing key that was used
61
- arguments (dict): Binding arguments that were used
62
- nowait (bool): Don't wait for confirmation
63
- channel: Channel to use
64
65
Returns:
66
Exchange instance for chaining
67
"""
68
69
def publish(self, message, routing_key=None, mandatory=False, immediate=False, **kwargs):
70
"""
71
Publish message to this exchange.
72
73
Parameters:
74
- message: Message body (will be serialized)
75
- routing_key (str): Message routing key
76
- mandatory (bool): Return message if no route found
77
- immediate (bool): Return message if no consumer ready
78
- **kwargs: Additional publish arguments
79
80
Returns:
81
Message instance
82
"""
83
84
def Message(self, body, delivery_mode=None, properties=None, **kwargs):
85
"""
86
Create message bound to this exchange.
87
88
Parameters:
89
- body: Message body
90
- delivery_mode (int): Delivery mode (1=transient, 2=persistent)
91
- properties (dict): Message properties
92
- **kwargs: Additional message arguments
93
94
Returns:
95
Message instance
96
"""
97
98
def delete(self, if_unused=False, nowait=False, channel=None):
99
"""
100
Delete exchange from broker.
101
102
Parameters:
103
- if_unused (bool): Only delete if no queues bound
104
- nowait (bool): Don't wait for confirmation
105
- channel: Channel to use
106
"""
107
108
def binding(self, routing_key='', arguments=None, unbind_arguments=None):
109
"""
110
Create binding object for this exchange.
111
112
Parameters:
113
- routing_key (str): Binding routing key
114
- arguments (dict): Binding arguments
115
- unbind_arguments (dict): Arguments for unbinding
116
117
Returns:
118
binding instance
119
"""
120
121
# Properties
122
@property
123
def name(self):
124
"""str: Exchange name"""
125
126
@property
127
def type(self):
128
"""str: Exchange type"""
129
130
@property
131
def durable(self):
132
"""bool: Durability flag"""
133
134
@property
135
def auto_delete(self):
136
"""bool: Auto-delete flag"""
137
138
@property
139
def delivery_mode(self):
140
"""int: Default delivery mode"""
141
142
@property
143
def arguments(self):
144
"""dict: Additional arguments"""
145
```
146
147
### Queue
148
149
Represents AMQP queue declarations that store messages and define consumption parameters.
150
151
```python { .api }
152
class Queue:
153
def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, exclusive=False, auto_delete=False, no_ack=None, alias=None, bindings=None, on_declared=None, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None, queue_arguments=None, binding_arguments=None, consumer_arguments=None, **kwargs):
154
"""
155
Create queue declaration.
156
157
Parameters:
158
- name (str): Queue name
159
- exchange (Exchange): Exchange to bind to
160
- routing_key (str): Routing key for binding
161
- channel: AMQP channel to bind to
162
- durable (bool): Survive broker restart
163
- exclusive (bool): Only allow one connection
164
- auto_delete (bool): Delete when no consumers
165
- no_ack (bool): Disable acknowledgments
166
- alias (str): Alias name for queue
167
- bindings (list): Additional bindings
168
- on_declared (callable): Callback when declared
169
- expires (int): Queue expiry time in ms (RabbitMQ)
170
- message_ttl (int): Message TTL in ms (RabbitMQ)
171
- max_length (int): Max queue length (RabbitMQ)
172
- max_length_bytes (int): Max queue size in bytes (RabbitMQ)
173
- max_priority (int): Max message priority (RabbitMQ)
174
- queue_arguments (dict): Queue-specific arguments
175
- binding_arguments (dict): Binding-specific arguments
176
- consumer_arguments (dict): Consumer-specific arguments
177
"""
178
179
def declare(self, nowait=False, channel=None):
180
"""
181
Declare queue and bindings on broker.
182
183
Parameters:
184
- nowait (bool): Don't wait for confirmation
185
- channel: Channel to use
186
187
Returns:
188
Queue instance for chaining
189
"""
190
191
def bind_to(self, exchange=None, routing_key=None, arguments=None, nowait=False, channel=None):
192
"""
193
Bind queue to exchange.
194
195
Parameters:
196
- exchange (Exchange|str): Exchange to bind to
197
- routing_key (str): Routing key for binding
198
- arguments (dict): Binding arguments
199
- nowait (bool): Don't wait for confirmation
200
- channel: Channel to use
201
202
Returns:
203
Queue instance for chaining
204
"""
205
206
def unbind_from(self, exchange, routing_key=None, arguments=None, nowait=False, channel=None):
207
"""
208
Unbind queue from exchange.
209
210
Parameters:
211
- exchange (Exchange|str): Exchange to unbind from
212
- routing_key (str): Routing key that was used
213
- arguments (dict): Binding arguments that were used
214
- nowait (bool): Don't wait for confirmation
215
- channel: Channel to use
216
217
Returns:
218
Queue instance for chaining
219
"""
220
221
def get(self, no_ack=None, accept=None):
222
"""
223
Poll for single message from queue.
224
225
Parameters:
226
- no_ack (bool): Disable acknowledgment
227
- accept (list): Accepted content types
228
229
Returns:
230
Message instance or None if queue empty
231
"""
232
233
def purge(self, nowait=False):
234
"""
235
Remove all ready messages from queue.
236
237
Parameters:
238
- nowait (bool): Don't wait for confirmation
239
240
Returns:
241
Number of messages purged
242
"""
243
244
def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False):
245
"""
246
Start consuming messages from queue.
247
248
Parameters:
249
- consumer_tag (str): Consumer identifier
250
- callback (callable): Message callback function
251
- no_ack (bool): Disable acknowledgments
252
- nowait (bool): Don't wait for confirmation
253
254
Returns:
255
Consumer tag
256
"""
257
258
def cancel(self, consumer_tag, nowait=False):
259
"""
260
Cancel queue consumer.
261
262
Parameters:
263
- consumer_tag (str): Consumer to cancel
264
- nowait (bool): Don't wait for confirmation
265
"""
266
267
def delete(self, if_unused=False, if_empty=False, nowait=False):
268
"""
269
Delete queue from broker.
270
271
Parameters:
272
- if_unused (bool): Only delete if no consumers
273
- if_empty (bool): Only delete if no messages
274
- nowait (bool): Don't wait for confirmation
275
276
Returns:
277
Number of messages deleted
278
"""
279
280
# Properties
281
@property
282
def name(self):
283
"""str: Queue name"""
284
285
@property
286
def exchange(self):
287
"""Exchange: Associated exchange"""
288
289
@property
290
def routing_key(self):
291
"""str: Routing/binding key"""
292
293
@property
294
def durable(self):
295
"""bool: Durability flag"""
296
297
@property
298
def exclusive(self):
299
"""bool: Exclusivity flag"""
300
301
@property
302
def auto_delete(self):
303
"""bool: Auto-delete flag"""
304
305
@property
306
def expires(self):
307
"""int: Queue expiry time"""
308
309
@property
310
def message_ttl(self):
311
"""int: Message TTL"""
312
313
@property
314
def max_length(self):
315
"""int: Maximum queue length"""
316
317
@property
318
def max_priority(self):
319
"""int: Maximum message priority"""
320
```
321
322
### Binding
323
324
Represents queue or exchange binding declarations that define routing relationships.
325
326
```python { .api }
327
class binding:
328
def __init__(self, exchange=None, routing_key='', arguments=None, unbind_arguments=None):
329
"""
330
Create binding declaration.
331
332
Parameters:
333
- exchange (Exchange): Exchange to bind to
334
- routing_key (str): Routing key for binding
335
- arguments (dict): Binding arguments
336
- unbind_arguments (dict): Arguments for unbinding
337
"""
338
339
def declare(self, channel, nowait=False):
340
"""
341
Declare the destination exchange.
342
343
Parameters:
344
- channel: AMQP channel to use
345
- nowait (bool): Don't wait for confirmation
346
347
Returns:
348
binding instance for chaining
349
"""
350
351
def bind(self, entity, nowait=False, channel=None):
352
"""
353
Bind entity (queue/exchange) to this binding.
354
355
Parameters:
356
- entity (Queue|Exchange): Entity to bind
357
- nowait (bool): Don't wait for confirmation
358
- channel: Channel to use
359
360
Returns:
361
binding instance for chaining
362
"""
363
364
def unbind(self, entity, nowait=False, channel=None):
365
"""
366
Unbind entity from this binding.
367
368
Parameters:
369
- entity (Queue|Exchange): Entity to unbind
370
- nowait (bool): Don't wait for confirmation
371
- channel: Channel to use
372
373
Returns:
374
binding instance for chaining
375
"""
376
377
# Properties
378
@property
379
def exchange(self):
380
"""Exchange: Target exchange"""
381
382
@property
383
def routing_key(self):
384
"""str: Binding routing key"""
385
386
@property
387
def arguments(self):
388
"""dict: Binding arguments"""
389
```
390
391
## Usage Examples
392
393
### Exchange Types and Routing
394
395
```python
396
from kombu import Exchange, Queue
397
398
# Direct exchange - exact routing key match
399
direct_exchange = Exchange('logs', type='direct', durable=True)
400
401
# Topic exchange - pattern matching with wildcards
402
topic_exchange = Exchange('events', type='topic', durable=True)
403
404
# Fanout exchange - broadcast to all bound queues
405
fanout_exchange = Exchange('notifications', type='fanout', durable=True)
406
407
# Headers exchange - route by message headers
408
headers_exchange = Exchange('priority', type='headers', durable=True)
409
```
410
411
### Queue Declaration and Binding
412
413
```python
414
from kombu import Connection, Exchange, Queue
415
416
# Define entities
417
task_exchange = Exchange('tasks', type='direct', durable=True)
418
task_queue = Queue(
419
'high_priority_tasks',
420
exchange=task_exchange,
421
routing_key='high',
422
durable=True,
423
message_ttl=300000, # 5 minutes
424
max_length=1000
425
)
426
427
with Connection('redis://localhost:6379/0') as conn:
428
channel = conn.channel()
429
430
# Declare exchange and queue
431
task_exchange.declare(channel=channel)
432
task_queue.declare(channel=channel)
433
```
434
435
### Multiple Bindings
436
437
```python
438
from kombu import Exchange, Queue, binding
439
440
# Create exchange and queue
441
log_exchange = Exchange('logs', type='topic', durable=True)
442
error_queue = Queue('error_logs', durable=True)
443
444
# Create multiple bindings for the queue
445
error_bindings = [
446
binding(log_exchange, 'app.*.error'),
447
binding(log_exchange, 'system.critical'),
448
binding(log_exchange, 'database.failure')
449
]
450
451
# Apply bindings to queue
452
error_queue.bindings = error_bindings
453
454
with Connection('amqp://localhost') as conn:
455
# Declare everything
456
error_queue.declare(channel=conn.channel())
457
```
458
459
### Queue Operations
460
461
```python
462
from kombu import Connection, Queue, Exchange
463
464
task_exchange = Exchange('tasks', type='direct')
465
task_queue = Queue('task_queue', task_exchange, routing_key='task')
466
467
with Connection('redis://localhost:6379/0') as conn:
468
channel = conn.channel()
469
task_queue = task_queue.bind(channel)
470
471
# Get single message
472
message = task_queue.get()
473
if message:
474
print(f"Received: {message.payload}")
475
message.ack()
476
477
# Purge queue
478
purged_count = task_queue.purge()
479
print(f"Purged {purged_count} messages")
480
481
# Get queue info (if supported by transport)
482
try:
483
# Some transports support queue inspection
484
info = channel.queue_declare(task_queue.name, passive=True)
485
print(f"Queue has {info.message_count} messages")
486
except Exception:
487
pass
488
```
489
490
### Exchange-to-Exchange Binding
491
492
```python
493
from kombu import Exchange
494
495
# Create exchanges
496
source_exchange = Exchange('source', type='topic', durable=True)
497
destination_exchange = Exchange('destination', type='direct', durable=True)
498
499
with Connection('amqp://localhost') as conn:
500
channel = conn.channel()
501
502
# Declare both exchanges
503
source_exchange.declare(channel=channel)
504
destination_exchange.declare(channel=channel)
505
506
# Bind destination to source
507
destination_exchange.bind_to(
508
source_exchange,
509
routing_key='important.*',
510
channel=channel
511
)
512
```
513
514
### Dynamic Queue Management
515
516
```python
517
from kombu import Connection, Exchange, Queue
518
519
def create_user_queue(user_id):
520
"""Create dedicated queue for user"""
521
user_exchange = Exchange(f'user_{user_id}', type='direct', durable=True)
522
user_queue = Queue(
523
f'user_{user_id}_messages',
524
exchange=user_exchange,
525
routing_key='message',
526
durable=True,
527
auto_delete=True, # Clean up when user disconnects
528
expires=3600000 # Expire after 1 hour of inactivity
529
)
530
return user_exchange, user_queue
531
532
# Usage
533
with Connection('redis://localhost:6379/0') as conn:
534
exchange, queue = create_user_queue('12345')
535
536
# Declare entities
537
exchange.declare(channel=conn.channel())
538
queue.declare(channel=conn.channel())
539
540
# Later, when user is done
541
queue.delete(if_unused=True)
542
exchange.delete(if_unused=True)
543
```