Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
npx @tessl/cli install tessl/pypi-kombu@5.5.00
# Kombu
1
2
A comprehensive messaging library for Python that provides an idiomatic high-level interface for the Advanced Message Queuing Protocol (AMQP) and other message brokers. Kombu enables developers to build robust messaging systems with support for multiple message servers including RabbitMQ, Redis, MongoDB, Amazon SQS, and others.
3
4
## Package Information
5
6
- **Package Name**: kombu
7
- **Language**: Python
8
- **Installation**: `pip install kombu`
9
- **Version**: 5.5.4
10
- **License**: BSD-3-Clause
11
12
## Core Imports
13
14
```python
15
import kombu
16
```
17
18
Common patterns for working with messaging:
19
20
```python
21
from kombu import Connection, BrokerConnection, Exchange, Queue, Producer, Consumer
22
```
23
24
For simple queue operations:
25
26
```python
27
from kombu.simple import SimpleQueue
28
```
29
30
For consumer programs:
31
32
```python
33
from kombu.mixins import ConsumerMixin
34
```
35
36
For connection and producer pooling:
37
38
```python
39
from kombu import pools
40
from kombu.pools import connections, producers
41
```
42
43
For compression:
44
45
```python
46
from kombu import compression
47
```
48
49
## Basic Usage
50
51
```python
52
from kombu import Connection, Exchange, Queue, Producer, Consumer
53
54
# Define message exchange and queue
55
task_exchange = Exchange('tasks', type='direct')
56
task_queue = Queue('task_queue', task_exchange, routing_key='task')
57
58
# Connect to broker
59
with Connection('redis://localhost:6379/0') as conn:
60
# Publish a message
61
with conn.Producer() as producer:
62
producer.publish(
63
{'hello': 'world'},
64
exchange=task_exchange,
65
routing_key='task',
66
declare=[task_queue]
67
)
68
69
# Consume messages
70
def process_message(body, message):
71
print('Received message:', body)
72
message.ack()
73
74
with conn.Consumer(task_queue, callbacks=[process_message]) as consumer:
75
# Process one message
76
conn.drain_events()
77
```
78
79
## Architecture
80
81
Kombu provides a comprehensive messaging abstraction built around these core concepts:
82
83
- **Connection**: Manages broker connections with automatic reconnection and pooling
84
- **Producer/Consumer**: High-level interfaces for publishing and consuming messages
85
- **Exchange/Queue**: AMQP entity declarations for message routing
86
- **Message**: Unified message handling with acknowledgment and error management
87
- **Transport**: Pluggable backend support for different message brokers
88
89
The library abstracts transport-specific details while exposing rich AMQP features when available, making it suitable for both simple messaging scenarios and complex distributed systems. It serves as the foundation for distributed task processing systems like Celery.
90
91
## Capabilities
92
93
### Connection Management
94
95
Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends.
96
97
```python { .api }
98
class Connection:
99
def __init__(self, hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, transport=None, **kwargs): ...
100
def connect(self): ...
101
def channel(self): ...
102
def drain_events(self, **kwargs): ...
103
def ensure_connection(self, errback=None, max_retries=None, **retry_policy): ...
104
105
def parse_url(url: str) -> dict: ...
106
```
107
108
[Connection Management](./connection.md)
109
110
### Message Entities
111
112
AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior.
113
114
```python { .api }
115
class Exchange:
116
def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, **kwargs): ...
117
def declare(self, nowait=False, passive=None, channel=None): ...
118
def publish(self, message, routing_key=None, **kwargs): ...
119
120
class Queue:
121
def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, **kwargs): ...
122
def declare(self, nowait=False, channel=None): ...
123
def bind_to(self, exchange=None, routing_key=None, **kwargs): ...
124
def get(self, no_ack=None, accept=None): ...
125
126
class binding:
127
def __init__(self, exchange=None, routing_key='', arguments=None, **kwargs): ...
128
```
129
130
[Message Entities](./entities.md)
131
132
### Messaging
133
134
High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support.
135
136
```python { .api }
137
class Producer:
138
def __init__(self, channel, exchange=None, routing_key='', serializer=None, **kwargs): ...
139
def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, **kwargs): ...
140
def declare(self): ...
141
142
class Consumer:
143
def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, **kwargs): ...
144
def consume(self, no_ack=None): ...
145
def register_callback(self, callback): ...
146
def add_queue(self, queue): ...
147
148
class Message:
149
def ack(self, multiple=False): ...
150
def reject(self, requeue=False): ...
151
def decode(self): ...
152
```
153
154
[Messaging](./messaging.md)
155
156
### Simple Interface
157
158
Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module.
159
160
```python { .api }
161
class SimpleQueue:
162
def __init__(self, channel, name, no_ack=False, **kwargs): ...
163
def get(self, block=True, timeout=None): ...
164
def put(self, message, serializer=None, headers=None, **kwargs): ...
165
def clear(self): ...
166
def qsize(self): ...
167
168
class SimpleBuffer:
169
def __init__(self, channel, name, no_ack=True, **kwargs): ...
170
```
171
172
[Simple Interface](./simple.md)
173
174
### Consumer Mixins
175
176
Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling.
177
178
```python { .api }
179
class ConsumerMixin:
180
def get_consumers(self, Consumer, channel): ...
181
def run(self, _tokens=1, **kwargs): ...
182
def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): ...
183
def on_connection_error(self, exc, interval): ...
184
185
class ConsumerProducerMixin(ConsumerMixin):
186
@property
187
def producer(self): ...
188
@property
189
def producer_connection(self): ...
190
```
191
192
[Consumer Mixins](./mixins.md)
193
194
### Serialization
195
196
Pluggable serialization system with security controls for encoding and decoding message payloads across different formats.
197
198
```python { .api }
199
def dumps(data, serializer=None): ...
200
def loads(data, content_type, content_encoding='utf-8', accept=None, **kwargs): ...
201
def register(name, encoder, decoder, content_type, content_encoding='utf-8'): ...
202
def enable_insecure_serializers(choices=None): ...
203
def disable_insecure_serializers(allowed=None): ...
204
```
205
206
[Serialization](./serialization.md)
207
208
### Exception Handling
209
210
Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems.
211
212
```python { .api }
213
class KombuError(Exception): ...
214
class OperationalError(KombuError): ...
215
class SerializationError(KombuError): ...
216
class NotBoundError(KombuError): ...
217
class MessageStateError(KombuError): ...
218
class LimitExceeded(KombuError): ...
219
```
220
221
[Exception Handling](./exceptions.md)
222
223
### Connection and Producer Pooling
224
225
Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications.
226
227
```python { .api }
228
class ProducerPool:
229
def __init__(self, connections, *args, **kwargs): ...
230
def acquire(self, block=False, timeout=None): ...
231
def release(self, resource): ...
232
233
class PoolGroup:
234
def __init__(self, limit=None, close_after_fork=True): ...
235
def create(self, resource, limit): ...
236
237
class Connections(PoolGroup): ...
238
class Producers(PoolGroup): ...
239
240
connections: Connections # Global connection pool group
241
producers: Producers # Global producer pool group
242
243
def get_limit() -> int: ...
244
def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int: ...
245
def reset(*args, **kwargs): ...
246
```
247
248
[Connection and Producer Pooling](./pools.md)
249
250
### Compression
251
252
Message payload compression utilities supporting multiple compression algorithms for reducing message size.
253
254
```python { .api }
255
def register(encoder, decoder, content_type, aliases=None): ...
256
def encoders() -> list: ...
257
def get_encoder(content_type): ...
258
def get_decoder(content_type): ...
259
def compress(body, content_type): ...
260
def decompress(body, content_type): ...
261
```
262
263
[Compression](./compression.md)
264
265
## Transport Support
266
267
Kombu supports multiple message brokers through pluggable transports:
268
269
- **AMQP**: RabbitMQ, Apache Qpid (pyamqp, librabbitmq)
270
- **Redis**: Redis server with pub/sub support
271
- **Memory**: In-memory transport for testing
272
- **Amazon SQS**: Amazon Simple Queue Service
273
- **MongoDB**: MongoDB as message broker
274
- **SQLAlchemy**: Database-backed transport
275
- **Filesystem**: File-based transport for development
276
- **Plus many others**: Google Cloud Pub/Sub, Azure Service Bus, Kafka, etc.
277
278
Transport selection is automatic based on connection URL or can be specified explicitly.
279
280
## Common Patterns
281
282
### Connection Pooling
283
284
```python
285
from kombu import pools
286
287
# Global connection pool
288
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
289
# Use connection
290
pass
291
292
# Producer pool
293
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
294
producer.publish({'msg': 'data'}, routing_key='key')
295
```
296
297
### Event Loop Processing
298
299
```python
300
from kombu.common import eventloop
301
302
# Process events with timeout
303
for _ in eventloop(connection, limit=None, timeout=1.0):
304
pass # Events processed via consumers
305
```
306
307
### URL-based Configuration
308
309
```python
310
from kombu.utils.url import parse_url
311
312
# Parse broker URL
313
parsed = parse_url('redis://user:pass@localhost:6379/1')
314
# Returns: {'transport': 'redis', 'hostname': 'localhost', 'port': 6379, ...}
315
```