or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

index.mddocs/

0

# Kombu

1

2

A comprehensive messaging library for Python that provides an idiomatic high-level interface for the Advanced Message Queuing Protocol (AMQP) and other message brokers. Kombu enables developers to build robust messaging systems with support for multiple message servers including RabbitMQ, Redis, MongoDB, Amazon SQS, and others.

3

4

## Package Information

5

6

- **Package Name**: kombu

7

- **Language**: Python

8

- **Installation**: `pip install kombu`

9

- **Version**: 5.5.4

10

- **License**: BSD-3-Clause

11

12

## Core Imports

13

14

```python

15

import kombu

16

```

17

18

Common patterns for working with messaging:

19

20

```python

21

from kombu import Connection, BrokerConnection, Exchange, Queue, Producer, Consumer

22

```

23

24

For simple queue operations:

25

26

```python

27

from kombu.simple import SimpleQueue

28

```

29

30

For consumer programs:

31

32

```python

33

from kombu.mixins import ConsumerMixin

34

```

35

36

For connection and producer pooling:

37

38

```python

39

from kombu import pools

40

from kombu.pools import connections, producers

41

```

42

43

For compression:

44

45

```python

46

from kombu import compression

47

```

48

49

## Basic Usage

50

51

```python

52

from kombu import Connection, Exchange, Queue, Producer, Consumer

53

54

# Define message exchange and queue

55

task_exchange = Exchange('tasks', type='direct')

56

task_queue = Queue('task_queue', task_exchange, routing_key='task')

57

58

# Connect to broker

59

with Connection('redis://localhost:6379/0') as conn:

60

# Publish a message

61

with conn.Producer() as producer:

62

producer.publish(

63

{'hello': 'world'},

64

exchange=task_exchange,

65

routing_key='task',

66

declare=[task_queue]

67

)

68

69

# Consume messages

70

def process_message(body, message):

71

print('Received message:', body)

72

message.ack()

73

74

with conn.Consumer(task_queue, callbacks=[process_message]) as consumer:

75

# Process one message

76

conn.drain_events()

77

```

78

79

## Architecture

80

81

Kombu provides a comprehensive messaging abstraction built around these core concepts:

82

83

- **Connection**: Manages broker connections with automatic reconnection and pooling

84

- **Producer/Consumer**: High-level interfaces for publishing and consuming messages

85

- **Exchange/Queue**: AMQP entity declarations for message routing

86

- **Message**: Unified message handling with acknowledgment and error management

87

- **Transport**: Pluggable backend support for different message brokers

88

89

The library abstracts transport-specific details while exposing rich AMQP features when available, making it suitable for both simple messaging scenarios and complex distributed systems. It serves as the foundation for distributed task processing systems like Celery.

90

91

## Capabilities

92

93

### Connection Management

94

95

Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends.

96

97

```python { .api }

98

class Connection:

99

def __init__(self, hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, transport=None, **kwargs): ...

100

def connect(self): ...

101

def channel(self): ...

102

def drain_events(self, **kwargs): ...

103

def ensure_connection(self, errback=None, max_retries=None, **retry_policy): ...

104

105

def parse_url(url: str) -> dict: ...

106

```

107

108

[Connection Management](./connection.md)

109

110

### Message Entities

111

112

AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior.

113

114

```python { .api }

115

class Exchange:

116

def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, **kwargs): ...

117

def declare(self, nowait=False, passive=None, channel=None): ...

118

def publish(self, message, routing_key=None, **kwargs): ...

119

120

class Queue:

121

def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, **kwargs): ...

122

def declare(self, nowait=False, channel=None): ...

123

def bind_to(self, exchange=None, routing_key=None, **kwargs): ...

124

def get(self, no_ack=None, accept=None): ...

125

126

class binding:

127

def __init__(self, exchange=None, routing_key='', arguments=None, **kwargs): ...

128

```

129

130

[Message Entities](./entities.md)

131

132

### Messaging

133

134

High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support.

135

136

```python { .api }

137

class Producer:

138

def __init__(self, channel, exchange=None, routing_key='', serializer=None, **kwargs): ...

139

def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, **kwargs): ...

140

def declare(self): ...

141

142

class Consumer:

143

def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, **kwargs): ...

144

def consume(self, no_ack=None): ...

145

def register_callback(self, callback): ...

146

def add_queue(self, queue): ...

147

148

class Message:

149

def ack(self, multiple=False): ...

150

def reject(self, requeue=False): ...

151

def decode(self): ...

152

```

153

154

[Messaging](./messaging.md)

155

156

### Simple Interface

157

158

Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module.

159

160

```python { .api }

161

class SimpleQueue:

162

def __init__(self, channel, name, no_ack=False, **kwargs): ...

163

def get(self, block=True, timeout=None): ...

164

def put(self, message, serializer=None, headers=None, **kwargs): ...

165

def clear(self): ...

166

def qsize(self): ...

167

168

class SimpleBuffer:

169

def __init__(self, channel, name, no_ack=True, **kwargs): ...

170

```

171

172

[Simple Interface](./simple.md)

173

174

### Consumer Mixins

175

176

Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling.

177

178

```python { .api }

179

class ConsumerMixin:

180

def get_consumers(self, Consumer, channel): ...

181

def run(self, _tokens=1, **kwargs): ...

182

def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): ...

183

def on_connection_error(self, exc, interval): ...

184

185

class ConsumerProducerMixin(ConsumerMixin):

186

@property

187

def producer(self): ...

188

@property

189

def producer_connection(self): ...

190

```

191

192

[Consumer Mixins](./mixins.md)

193

194

### Serialization

195

196

Pluggable serialization system with security controls for encoding and decoding message payloads across different formats.

197

198

```python { .api }

199

def dumps(data, serializer=None): ...

200

def loads(data, content_type, content_encoding='utf-8', accept=None, **kwargs): ...

201

def register(name, encoder, decoder, content_type, content_encoding='utf-8'): ...

202

def enable_insecure_serializers(choices=None): ...

203

def disable_insecure_serializers(allowed=None): ...

204

```

205

206

[Serialization](./serialization.md)

207

208

### Exception Handling

209

210

Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems.

211

212

```python { .api }

213

class KombuError(Exception): ...

214

class OperationalError(KombuError): ...

215

class SerializationError(KombuError): ...

216

class NotBoundError(KombuError): ...

217

class MessageStateError(KombuError): ...

218

class LimitExceeded(KombuError): ...

219

```

220

221

[Exception Handling](./exceptions.md)

222

223

### Connection and Producer Pooling

224

225

Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications.

226

227

```python { .api }

228

class ProducerPool:

229

def __init__(self, connections, *args, **kwargs): ...

230

def acquire(self, block=False, timeout=None): ...

231

def release(self, resource): ...

232

233

class PoolGroup:

234

def __init__(self, limit=None, close_after_fork=True): ...

235

def create(self, resource, limit): ...

236

237

class Connections(PoolGroup): ...

238

class Producers(PoolGroup): ...

239

240

connections: Connections # Global connection pool group

241

producers: Producers # Global producer pool group

242

243

def get_limit() -> int: ...

244

def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int: ...

245

def reset(*args, **kwargs): ...

246

```

247

248

[Connection and Producer Pooling](./pools.md)

249

250

### Compression

251

252

Message payload compression utilities supporting multiple compression algorithms for reducing message size.

253

254

```python { .api }

255

def register(encoder, decoder, content_type, aliases=None): ...

256

def encoders() -> list: ...

257

def get_encoder(content_type): ...

258

def get_decoder(content_type): ...

259

def compress(body, content_type): ...

260

def decompress(body, content_type): ...

261

```

262

263

[Compression](./compression.md)

264

265

## Transport Support

266

267

Kombu supports multiple message brokers through pluggable transports:

268

269

- **AMQP**: RabbitMQ, Apache Qpid (pyamqp, librabbitmq)

270

- **Redis**: Redis server with pub/sub support

271

- **Memory**: In-memory transport for testing

272

- **Amazon SQS**: Amazon Simple Queue Service

273

- **MongoDB**: MongoDB as message broker

274

- **SQLAlchemy**: Database-backed transport

275

- **Filesystem**: File-based transport for development

276

- **Plus many others**: Google Cloud Pub/Sub, Azure Service Bus, Kafka, etc.

277

278

Transport selection is automatic based on connection URL or can be specified explicitly.

279

280

## Common Patterns

281

282

### Connection Pooling

283

284

```python

285

from kombu import pools

286

287

# Global connection pool

288

with pools.connections['redis://localhost:6379/0'].acquire() as conn:

289

# Use connection

290

pass

291

292

# Producer pool

293

with pools.producers['redis://localhost:6379/0'].acquire() as producer:

294

producer.publish({'msg': 'data'}, routing_key='key')

295

```

296

297

### Event Loop Processing

298

299

```python

300

from kombu.common import eventloop

301

302

# Process events with timeout

303

for _ in eventloop(connection, limit=None, timeout=1.0):

304

pass # Events processed via consumers

305

```

306

307

### URL-based Configuration

308

309

```python

310

from kombu.utils.url import parse_url

311

312

# Parse broker URL

313

parsed = parse_url('redis://user:pass@localhost:6379/1')

314

# Returns: {'transport': 'redis', 'hostname': 'localhost', 'port': 6379, ...}

315

```