or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

messaging.mddocs/

0

# Messaging

1

2

High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support. The messaging API provides the primary interface for sending and receiving messages in Kombu applications.

3

4

## Capabilities

5

6

### Producer

7

8

Message producer for publishing messages to exchanges with serialization, compression, and delivery options.

9

10

```python { .api }

11

class Producer:

12

def __init__(self, channel, exchange=None, routing_key='', serializer=None, compression=None, auto_declare=True, on_return=None, **kwargs):

13

"""

14

Create message producer.

15

16

Parameters:

17

- channel: AMQP channel to use

18

- exchange (Exchange): Default exchange for publishing

19

- routing_key (str): Default routing key

20

- serializer (str): Default serialization method

21

- compression (str): Default compression method

22

- auto_declare (bool): Automatically declare entities

23

- on_return (callable): Callback for returned messages

24

"""

25

26

def declare(self):

27

"""

28

Declare the default exchange and any entities in auto_declare list.

29

30

Returns:

31

Producer instance for chaining

32

"""

33

34

def maybe_declare(self, entity, retry=False, **retry_policy):

35

"""

36

Declare entity if not already declared (cached).

37

38

Parameters:

39

- entity (Exchange|Queue): Entity to declare

40

- retry (bool): Enable retry on failure

41

- retry_policy: Retry policy parameters

42

43

Returns:

44

bool: True if entity was declared

45

"""

46

47

def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, confirm_timeout=None, **properties):

48

"""

49

Publish message to exchange.

50

51

Parameters:

52

- body: Message body (will be serialized)

53

- routing_key (str): Message routing key

54

- delivery_mode (int): Delivery mode (1=transient, 2=persistent)

55

- mandatory (bool): Return message if no route found

56

- immediate (bool): Return message if no consumer ready

57

- priority (int): Message priority (0-255)

58

- content_type (str): Content type override

59

- content_encoding (str): Content encoding override

60

- serializer (str): Serializer override

61

- headers (dict): Message headers

62

- compression (str): Compression method override

63

- exchange (Exchange): Exchange override

64

- retry (bool): Enable retry on failure

65

- retry_policy (dict): Retry policy parameters

66

- declare (list): Entities to declare before publishing

67

- expiration (str): Message expiration time

68

- timeout (float): Operation timeout in seconds

69

- confirm_timeout (float): Publisher confirmation timeout

70

- **properties: Additional message properties

71

72

Returns:

73

None

74

"""

75

76

def revive(self, channel):

77

"""

78

Revive producer after connection re-establishment.

79

80

Parameters:

81

- channel: New channel to use

82

83

Returns:

84

Producer instance for chaining

85

"""

86

87

def close(self):

88

"""Close producer and cleanup resources."""

89

90

def release(self):

91

"""Release producer resources (alias for close)."""

92

93

# Properties

94

@property

95

def channel(self):

96

"""Channel: AMQP channel"""

97

98

@property

99

def exchange(self):

100

"""Exchange: Default exchange"""

101

102

@property

103

def routing_key(self):

104

"""str: Default routing key"""

105

106

@property

107

def serializer(self):

108

"""str: Default serializer"""

109

110

@property

111

def compression(self):

112

"""str: Default compression method"""

113

114

@property

115

def auto_declare(self):

116

"""bool: Auto-declare flag"""

117

118

@property

119

def on_return(self):

120

"""callable: Basic return callback"""

121

```

122

123

### Consumer

124

125

Message consumer for receiving messages from queues with callback handling, acknowledgment control, and quality of service management.

126

127

```python { .api }

128

class Consumer:

129

def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None, **kwargs):

130

"""

131

Create message consumer.

132

133

Parameters:

134

- channel: AMQP channel to use

135

- queues (list): Queues to consume from

136

- no_ack (bool): Disable message acknowledgments

137

- auto_declare (bool): Automatically declare entities

138

- callbacks (list): Message callback functions

139

- on_decode_error (callable): Decode error callback

140

- on_message (callable): Alternative message handler

141

- accept (list): Accepted content types

142

- prefetch_count (int): QoS prefetch count

143

- tag_prefix (str): Consumer tag prefix

144

"""

145

146

def revive(self, channel):

147

"""

148

Revive consumer after connection re-establishment.

149

150

Parameters:

151

- channel: New channel to use

152

153

Returns:

154

Consumer instance for chaining

155

"""

156

157

def declare(self):

158

"""

159

Declare queues, exchanges and bindings.

160

161

Returns:

162

Consumer instance for chaining

163

"""

164

165

def register_callback(self, callback):

166

"""

167

Register new callback function.

168

169

Parameters:

170

- callback (callable): Function to call for each message

171

172

Returns:

173

Consumer instance for chaining

174

"""

175

176

def add_queue(self, queue):

177

"""

178

Add queue to consume from.

179

180

Parameters:

181

- queue (Queue): Queue to add

182

183

Returns:

184

Consumer instance for chaining

185

"""

186

187

def consume(self, no_ack=None):

188

"""

189

Start consuming messages from queues.

190

191

Parameters:

192

- no_ack (bool): Disable acknowledgments override

193

194

Returns:

195

Consumer instance for chaining

196

"""

197

198

def cancel(self):

199

"""

200

End all active queue consumers.

201

202

Returns:

203

Consumer instance for chaining

204

"""

205

206

def cancel_by_queue(self, queue):

207

"""

208

Cancel consumer for specific queue.

209

210

Parameters:

211

- queue (str|Queue): Queue to stop consuming

212

213

Returns:

214

Consumer instance for chaining

215

"""

216

217

def consuming_from(self, queue):

218

"""

219

Check if currently consuming from queue.

220

221

Parameters:

222

- queue (str|Queue): Queue to check

223

224

Returns:

225

bool: True if consuming from queue

226

"""

227

228

def purge(self):

229

"""

230

Purge messages from all queues.

231

232

Returns:

233

int: Total number of messages purged

234

"""

235

236

def flow(self, active):

237

"""

238

Enable/disable flow from peer.

239

240

Parameters:

241

- active (bool): Enable or disable flow

242

243

Returns:

244

Consumer instance for chaining

245

"""

246

247

def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):

248

"""

249

Set quality of service limits.

250

251

Parameters:

252

- prefetch_size (int): Prefetch window size

253

- prefetch_count (int): Prefetch message count

254

- apply_global (bool): Apply globally or per-consumer

255

256

Returns:

257

Consumer instance for chaining

258

"""

259

260

def recover(self, requeue=False):

261

"""

262

Redeliver unacknowledged messages.

263

264

Parameters:

265

- requeue (bool): Requeue messages to original position

266

267

Returns:

268

Consumer instance for chaining

269

"""

270

271

def receive(self, body, message):

272

"""

273

Handle received message by calling callbacks.

274

275

Parameters:

276

- body: Decoded message body

277

- message (Message): Message instance

278

279

Returns:

280

None

281

"""

282

283

# Properties

284

@property

285

def channel(self):

286

"""Channel: AMQP channel"""

287

288

@property

289

def queues(self):

290

"""list: Queues being consumed"""

291

292

@property

293

def no_ack(self):

294

"""bool: Automatic acknowledgment flag"""

295

296

@property

297

def auto_declare(self):

298

"""bool: Auto-declare entities flag"""

299

300

@property

301

def callbacks(self):

302

"""list: Message callback functions"""

303

304

@property

305

def on_message(self):

306

"""callable: Alternative message handler"""

307

308

@property

309

def on_decode_error(self):

310

"""callable: Decode error callback"""

311

312

@property

313

def accept(self):

314

"""list: Accepted content types"""

315

316

@property

317

def prefetch_count(self):

318

"""int: QoS prefetch count"""

319

```

320

321

### Message

322

323

Base class for received messages with acknowledgment, rejection, and decoding capabilities.

324

325

```python { .api }

326

class Message:

327

def __init__(self, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info=None, properties=None, headers=None, **kwargs):

328

"""

329

Create message instance.

330

331

Parameters:

332

- body: Raw message body

333

- delivery_tag: Unique delivery identifier

334

- content_type (str): Message content type

335

- content_encoding (str): Content encoding

336

- delivery_info (dict): Delivery information

337

- properties (dict): Message properties

338

- headers (dict): Message headers

339

"""

340

341

def ack(self, multiple=False):

342

"""

343

Acknowledge message processing.

344

345

Parameters:

346

- multiple (bool): Acknowledge all messages up to this one

347

348

Raises:

349

MessageStateError: If message already acknowledged

350

"""

351

352

def ack_log_error(self, logger, errors, multiple=False):

353

"""

354

Acknowledge message with error logging.

355

356

Parameters:

357

- logger: Logger instance

358

- errors (tuple): Error types to catch and log

359

- multiple (bool): Acknowledge multiple messages

360

361

Returns:

362

bool: True if acknowledgment succeeded

363

"""

364

365

def reject(self, requeue=False):

366

"""

367

Reject message.

368

369

Parameters:

370

- requeue (bool): Requeue message for redelivery

371

372

Raises:

373

MessageStateError: If message already acknowledged

374

"""

375

376

def reject_log_error(self, logger, errors, requeue=False):

377

"""

378

Reject message with error logging.

379

380

Parameters:

381

- logger: Logger instance

382

- errors (tuple): Error types to catch and log

383

- requeue (bool): Requeue message

384

385

Returns:

386

bool: True if rejection succeeded

387

"""

388

389

def requeue(self):

390

"""

391

Reject and requeue message (shortcut for reject(requeue=True)).

392

393

Raises:

394

MessageStateError: If message already acknowledged

395

"""

396

397

def decode(self):

398

"""

399

Deserialize message body (cached).

400

401

Returns:

402

Decoded message body

403

"""

404

405

def _decode(self):

406

"""

407

Force re-decode message body.

408

409

Returns:

410

Decoded message body

411

"""

412

413

# Properties

414

@property

415

def acknowledged(self):

416

"""bool: True if message has been acknowledged"""

417

418

@property

419

def payload(self):

420

"""Decoded message body (cached)"""

421

422

@property

423

def body(self):

424

"""Raw message body"""

425

426

@property

427

def content_type(self):

428

"""str: Message content type"""

429

430

@property

431

def content_encoding(self):

432

"""str: Message content encoding"""

433

434

@property

435

def delivery_info(self):

436

"""dict: Delivery information"""

437

438

@property

439

def headers(self):

440

"""dict: Message headers"""

441

442

@property

443

def properties(self):

444

"""dict: Message properties"""

445

```

446

447

## Usage Examples

448

449

### Basic Producer Usage

450

451

```python

452

from kombu import Connection, Exchange, Producer

453

454

# Define exchange

455

task_exchange = Exchange('tasks', type='direct', durable=True)

456

457

with Connection('redis://localhost:6379/0') as conn:

458

# Create producer

459

producer = Producer(

460

conn.channel(),

461

exchange=task_exchange,

462

routing_key='default',

463

serializer='json'

464

)

465

466

# Publish messages

467

producer.publish(

468

{'task': 'process_data', 'args': [1, 2, 3]},

469

routing_key='high_priority',

470

headers={'origin': 'web_app'},

471

priority=5

472

)

473

474

# Publish with different serializer

475

producer.publish(

476

b'binary data',

477

routing_key='binary_task',

478

serializer='pickle',

479

content_type='application/x-python-serialize'

480

)

481

```

482

483

### Basic Consumer Usage

484

485

```python

486

from kombu import Connection, Queue, Consumer

487

488

def process_message(body, message):

489

"""Message processing callback"""

490

try:

491

print(f"Processing: {body}")

492

# Simulate work

493

result = body['args'][0] + body['args'][1]

494

print(f"Result: {result}")

495

496

# Acknowledge successful processing

497

message.ack()

498

except Exception as exc:

499

print(f"Processing failed: {exc}")

500

# Reject and requeue for retry

501

message.reject(requeue=True)

502

503

# Define queue

504

task_queue = Queue('task_queue', durable=True)

505

506

with Connection('redis://localhost:6379/0') as conn:

507

# Create consumer

508

consumer = Consumer(

509

conn.channel(),

510

queues=[task_queue],

511

callbacks=[process_message],

512

prefetch_count=10

513

)

514

515

# Start consuming

516

consumer.consume()

517

518

# Process messages

519

while True:

520

try:

521

conn.drain_events(timeout=1.0)

522

except socket.timeout:

523

break

524

```

525

526

### Advanced Producer Features

527

528

```python

529

from kombu import Connection, Exchange, Queue, Producer

530

531

# Setup entities

532

exchange = Exchange('notifications', type='topic', durable=True)

533

queue = Queue('email_notifications', exchange, routing_key='email.*')

534

535

with Connection('amqp://localhost') as conn:

536

producer = Producer(conn.channel(), exchange=exchange)

537

538

# Publish with automatic declaration

539

producer.publish(

540

{

541

'to': 'user@example.com',

542

'subject': 'Welcome!',

543

'body': 'Welcome to our service'

544

},

545

routing_key='email.welcome',

546

declare=[queue], # Declare queue before publishing

547

mandatory=True, # Return if no route

548

expiration='300000', # 5 minute TTL

549

headers={'priority': 'high'}

550

)

551

552

# Publish compressed message

553

producer.publish(

554

{'large': 'data' * 1000},

555

routing_key='email.report',

556

compression='gzip',

557

serializer='pickle'

558

)

559

```

560

561

### Advanced Consumer Features

562

563

```python

564

from kombu import Connection, Queue, Consumer

565

import logging

566

567

logger = logging.getLogger(__name__)

568

569

def handle_decode_error(message, exc):

570

"""Handle message decode errors"""

571

logger.error(f"Failed to decode message: {exc}")

572

# Log the raw message for debugging

573

logger.error(f"Raw message body: {message.body}")

574

# Reject without requeue to avoid infinite loop

575

message.reject(requeue=False)

576

577

def process_message(body, message):

578

"""Process message with comprehensive error handling"""

579

try:

580

print(f"Processing message: {body}")

581

582

# Simulate processing that might fail

583

if body.get('fail'):

584

raise ValueError("Simulated processing error")

585

586

# Acknowledge successful processing

587

message.ack_log_error(logger, (Exception,))

588

589

except ValueError as exc:

590

logger.error(f"Processing error: {exc}")

591

# Reject and requeue for retry

592

message.reject_log_error(logger, (Exception,), requeue=True)

593

594

# Setup queues with different priorities

595

high_priority_queue = Queue('high_priority', routing_key='high')

596

low_priority_queue = Queue('low_priority', routing_key='low')

597

598

with Connection('redis://localhost:6379/0') as conn:

599

consumer = Consumer(

600

conn.channel(),

601

queues=[high_priority_queue, low_priority_queue],

602

callbacks=[process_message],

603

on_decode_error=handle_decode_error,

604

accept=['json', 'pickle'], # Only accept these content types

605

prefetch_count=5

606

)

607

608

# Set QoS limits

609

consumer.qos(prefetch_count=10, apply_global=True)

610

611

# Start consuming

612

consumer.consume()

613

614

# Process with graceful shutdown

615

try:

616

while True:

617

conn.drain_events(timeout=1.0)

618

except KeyboardInterrupt:

619

print("Shutting down...")

620

consumer.cancel()

621

```

622

623

### Message Inspection and Handling

624

625

```python

626

from kombu import Connection, Queue, Consumer

627

628

def inspect_message(body, message):

629

"""Inspect message properties and handle accordingly"""

630

631

# Check message properties

632

print(f"Content type: {message.content_type}")

633

print(f"Delivery info: {message.delivery_info}")

634

print(f"Headers: {message.headers}")

635

print(f"Properties: {message.properties}")

636

637

# Handle based on message properties

638

if message.headers and message.headers.get('priority') == 'urgent':

639

print("Processing urgent message immediately")

640

# Process immediately

641

process_urgent(body)

642

message.ack()

643

elif message.properties.get('redelivered'):

644

print("Message was redelivered - handling carefully")

645

# Special handling for redelivered messages

646

if handle_redelivered(body):

647

message.ack()

648

else:

649

# Dead letter or discard

650

message.reject(requeue=False)

651

else:

652

# Normal processing

653

if process_normal(body):

654

message.ack()

655

else:

656

message.requeue()

657

658

def process_urgent(body):

659

# Urgent processing logic

660

return True

661

662

def handle_redelivered(body):

663

# Redelivered message logic

664

return True

665

666

def process_normal(body):

667

# Normal processing logic

668

return True

669

670

queue = Queue('inspection_queue')

671

672

with Connection('redis://localhost:6379/0') as conn:

673

consumer = Consumer(conn.channel(), [queue], callbacks=[inspect_message])

674

consumer.consume()

675

676

# Process messages

677

conn.drain_events()

678

```

679

680

### Producer with Return Handling

681

682

```python

683

from kombu import Connection, Exchange, Producer

684

685

def handle_returned_message(exception, exchange, routing_key, message):

686

"""Handle messages returned by broker"""

687

print(f"Message returned: {exception}")

688

print(f"Exchange: {exchange}, Routing key: {routing_key}")

689

print(f"Message: {message}")

690

691

# Could implement retry logic, logging, etc.

692

693

exchange = Exchange('optional_routing', type='direct')

694

695

with Connection('amqp://localhost') as conn:

696

producer = Producer(

697

conn.channel(),

698

exchange=exchange,

699

on_return=handle_returned_message

700

)

701

702

# Publish with mandatory flag - will be returned if no route exists

703

producer.publish(

704

{'data': 'test'},

705

routing_key='nonexistent_route',

706

mandatory=True # Return message if no queue bound

707

)

708

```