or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

exceptions.mddocs/

0

# Exception Handling

1

2

Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems. Kombu provides a structured set of exceptions that help identify and handle different types of failures in messaging applications.

3

4

## Capabilities

5

6

### Base Exception Classes

7

8

Foundation exception classes that provide the base hierarchy for all Kombu-specific errors.

9

10

```python { .api }

11

class KombuError(Exception):

12

"""

13

Common base class for all Kombu exceptions.

14

15

All Kombu-specific exceptions inherit from this class,

16

making it easy to catch any Kombu-related error.

17

"""

18

19

class OperationalError(KombuError):

20

"""

21

Recoverable message transport connection error.

22

23

Indicates a temporary error that may be resolved by retrying

24

the operation, such as network connectivity issues or

25

temporary broker unavailability.

26

"""

27

```

28

29

### Serialization Exceptions

30

31

Exceptions related to message serialization and deserialization failures.

32

33

```python { .api }

34

class SerializationError(KombuError):

35

"""

36

Failed to serialize or deserialize message content.

37

38

Base class for all serialization-related errors.

39

"""

40

41

class EncodeError(SerializationError):

42

"""

43

Cannot encode object for serialization.

44

45

Raised when an object cannot be serialized using the

46

specified serialization method.

47

"""

48

49

class DecodeError(SerializationError):

50

"""

51

Cannot decode serialized data.

52

53

Raised when serialized data cannot be deserialized,

54

either due to corruption or incompatible format.

55

"""

56

```

57

58

### Entity and Channel Exceptions

59

60

Exceptions related to AMQP entities, channels, and binding operations.

61

62

```python { .api }

63

class NotBoundError(KombuError):

64

"""

65

Trying to call channel method on unbound entity.

66

67

Raised when attempting to perform operations on exchanges,

68

queues, or other entities that haven't been bound to a channel.

69

"""

70

71

class MessageStateError(KombuError):

72

"""

73

Message already acknowledged or in invalid state.

74

75

Raised when attempting to acknowledge, reject, or requeue

76

a message that has already been processed.

77

"""

78

```

79

80

### Resource Limit Exceptions

81

82

Exceptions related to resource limits and capacity constraints.

83

84

```python { .api }

85

class LimitExceeded(KombuError):

86

"""

87

Generic limit exceeded error.

88

89

Base class for various limit-related exceptions.

90

"""

91

92

class ConnectionLimitExceeded(LimitExceeded):

93

"""

94

Maximum number of simultaneous connections exceeded.

95

96

Raised when attempting to create more connections than

97

allowed by broker or client configuration.

98

"""

99

100

class ChannelLimitExceeded(LimitExceeded):

101

"""

102

Maximum number of channels per connection exceeded.

103

104

Raised when attempting to create more channels than

105

allowed per connection.

106

"""

107

```

108

109

### Version and Compatibility Exceptions

110

111

Exceptions related to version mismatches and compatibility issues.

112

113

```python { .api }

114

class VersionMismatch(KombuError):

115

"""

116

Library dependency version mismatch.

117

118

Raised when required library versions are incompatible

119

with current Kombu version or each other.

120

"""

121

122

class SerializerNotInstalled(SerializationError):

123

"""

124

Required serialization library not installed.

125

126

Raised when attempting to use a serializer (like msgpack

127

or yaml) that requires additional packages not installed.

128

"""

129

```

130

131

### Content and Security Exceptions

132

133

Exceptions related to content filtering and security restrictions.

134

135

```python { .api }

136

class ContentDisallowed(KombuError):

137

"""

138

Consumer doesn't accept this content type.

139

140

Raised when attempting to deliver a message with content

141

type not in the consumer's accept list.

142

"""

143

144

class InconsistencyError(KombuError):

145

"""

146

Data or environment inconsistency detected.

147

148

Raised when internal state inconsistencies are detected

149

that may indicate configuration or data corruption issues.

150

"""

151

```

152

153

### HTTP and Network Exceptions

154

155

Exceptions related to HTTP transports and network operations.

156

157

```python { .api }

158

class HttpError(KombuError):

159

"""

160

HTTP client error.

161

162

Raised by HTTP-based transports when HTTP operations fail.

163

"""

164

```

165

166

### Utility Functions

167

168

Helper functions for exception handling and re-raising.

169

170

```python { .api }

171

def reraise(tp, value, tb=None):

172

"""

173

Reraise exception with preserved traceback.

174

175

Parameters:

176

- tp (type): Exception type

177

- value (Exception): Exception instance

178

- tb (traceback): Traceback object (optional)

179

180

Raises:

181

The provided exception with preserved traceback information.

182

"""

183

```

184

185

## Usage Examples

186

187

### Basic Exception Handling

188

189

```python

190

from kombu import Connection, Producer, Consumer, Queue

191

from kombu.exceptions import (

192

KombuError, OperationalError, SerializationError,

193

NotBoundError, MessageStateError

194

)

195

196

def robust_message_handling():

197

try:

198

with Connection('redis://localhost:6379/0') as conn:

199

queue = Queue('test_queue')

200

201

# This might raise NotBoundError if queue not bound

202

queue.declare(channel=conn.channel())

203

204

producer = Producer(conn.channel())

205

producer.publish({'message': 'hello'}, routing_key='test')

206

207

except OperationalError as e:

208

print(f"Connection/transport error: {e}")

209

# Could implement retry logic here

210

211

except SerializationError as e:

212

print(f"Serialization error: {e}")

213

# Handle serialization issues

214

215

except NotBoundError as e:

216

print(f"Entity not bound to channel: {e}")

217

# Fix binding issues

218

219

except KombuError as e:

220

print(f"Generic Kombu error: {e}")

221

# Handle any other Kombu-specific errors

222

223

except Exception as e:

224

print(f"Unexpected error: {e}")

225

# Handle non-Kombu errors

226

```

227

228

### Message Processing Error Handling

229

230

```python

231

from kombu import Connection, Consumer, Queue

232

from kombu.exceptions import MessageStateError, DecodeError

233

234

def safe_message_processor(body, message):

235

"""Process message with comprehensive error handling"""

236

try:

237

# Process the message

238

result = process_business_logic(body)

239

240

# Acknowledge successful processing

241

try:

242

message.ack()

243

except MessageStateError:

244

print("Message already acknowledged")

245

246

except DecodeError as e:

247

print(f"Failed to decode message: {e}")

248

# Reject malformed messages without requeue

249

try:

250

message.reject(requeue=False)

251

except MessageStateError:

252

pass # Already processed

253

254

except ValueError as e:

255

print(f"Business logic error: {e}")

256

# Requeue for retry

257

try:

258

message.reject(requeue=True)

259

except MessageStateError:

260

pass # Already processed

261

262

except Exception as e:

263

print(f"Unexpected processing error: {e}")

264

# Decide whether to requeue or reject

265

try:

266

message.reject(requeue=False) # Don't requeue unknown errors

267

except MessageStateError:

268

pass

269

270

def process_business_logic(data):

271

"""Business logic that might fail"""

272

if not isinstance(data, dict):

273

raise ValueError("Expected dict data")

274

275

if 'required_field' not in data:

276

raise ValueError("Missing required field")

277

278

return {'processed': True, 'result': data['required_field'] * 2}

279

280

# Usage

281

with Connection('redis://localhost:6379/0') as conn:

282

queue = Queue('error_handling_queue')

283

consumer = Consumer(conn.channel(), [queue], callbacks=[safe_message_processor])

284

consumer.consume()

285

286

# Process messages with error handling

287

conn.drain_events(timeout=1.0)

288

```

289

290

### Connection and Transport Error Handling

291

292

```python

293

from kombu import Connection

294

from kombu.exceptions import OperationalError, ConnectionLimitExceeded

295

import time

296

import random

297

298

def robust_connection_handler(broker_url, max_retries=5):

299

"""Handle connection with retry logic"""

300

retry_count = 0

301

backoff_base = 1

302

303

while retry_count < max_retries:

304

try:

305

conn = Connection(broker_url)

306

conn.connect() # Explicit connection

307

308

print("Connection established successfully")

309

return conn

310

311

except ConnectionLimitExceeded as e:

312

print(f"Connection limit exceeded: {e}")

313

# This might not be retryable

314

time.sleep(backoff_base * (2 ** retry_count))

315

retry_count += 1

316

317

except OperationalError as e:

318

print(f"Operational error (attempt {retry_count + 1}): {e}")

319

320

# Exponential backoff with jitter

321

sleep_time = backoff_base * (2 ** retry_count) + random.uniform(0, 1)

322

print(f"Retrying in {sleep_time:.2f} seconds...")

323

time.sleep(sleep_time)

324

retry_count += 1

325

326

except Exception as e:

327

print(f"Unexpected connection error: {e}")

328

break

329

330

print(f"Failed to establish connection after {max_retries} attempts")

331

return None

332

333

# Usage

334

conn = robust_connection_handler('redis://localhost:6379/0')

335

if conn:

336

try:

337

# Use connection

338

with conn:

339

# Perform operations

340

pass

341

finally:

342

conn.close()

343

```

344

345

### Serialization Error Handling

346

347

```python

348

from kombu.serialization import dumps, loads, enable_insecure_serializers

349

from kombu.exceptions import EncodeError, DecodeError, SerializerNotInstalled

350

351

def safe_serialization_test():

352

"""Test serialization with error handling"""

353

354

# Test data

355

serializable_data = {'message': 'hello', 'number': 42}

356

unserializable_data = {'function': lambda x: x} # Functions can't be serialized with JSON

357

358

# Test JSON serialization (safe)

359

try:

360

serialized, content_type, encoding = dumps(serializable_data, 'json')

361

deserialized = loads(serialized, content_type, encoding)

362

print(f"JSON serialization successful: {deserialized}")

363

except EncodeError as e:

364

print(f"JSON encode error: {e}")

365

except DecodeError as e:

366

print(f"JSON decode error: {e}")

367

368

# Test with unserializable data

369

try:

370

serialized, content_type, encoding = dumps(unserializable_data, 'json')

371

except EncodeError as e:

372

print(f"Expected JSON encode error: {e}")

373

374

# Test unavailable serializer

375

try:

376

enable_insecure_serializers(['nonexistent'])

377

serialized, content_type, encoding = dumps(serializable_data, 'nonexistent')

378

except SerializerNotInstalled as e:

379

print(f"Serializer not available: {e}")

380

except KeyError as e:

381

print(f"Unknown serializer: {e}")

382

383

# Test corrupted data

384

try:

385

corrupted_data = b'{"invalid": json'

386

deserialized = loads(corrupted_data, 'application/json')

387

except DecodeError as e:

388

print(f"Expected decode error: {e}")

389

390

safe_serialization_test()

391

```

392

393

### Exception Logging and Monitoring

394

395

```python

396

from kombu import Connection, Consumer, Queue

397

from kombu.exceptions import KombuError, OperationalError, SerializationError

398

import logging

399

import traceback

400

from datetime import datetime

401

402

# Setup logging

403

logging.basicConfig(level=logging.INFO)

404

logger = logging.getLogger(__name__)

405

406

class ErrorTrackingConsumer:

407

def __init__(self, connection, queues):

408

self.connection = connection

409

self.queues = queues

410

self.error_counts = {}

411

412

def process_message(self, body, message):

413

"""Process message with detailed error tracking"""

414

message_id = body.get('id', 'unknown')

415

416

try:

417

# Simulate processing

418

if body.get('should_fail'):

419

raise ValueError("Simulated processing failure")

420

421

logger.info(f"Successfully processed message {message_id}")

422

message.ack()

423

424

except Exception as exc:

425

self.handle_processing_error(exc, message, message_id)

426

427

def handle_processing_error(self, exc, message, message_id):

428

"""Handle and log processing errors"""

429

error_type = type(exc).__name__

430

431

# Track error counts

432

self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1

433

434

# Log error details

435

logger.error(f"Processing error for message {message_id}: {exc}")

436

logger.error(f"Error type: {error_type}")

437

logger.error(f"Total {error_type} errors: {self.error_counts[error_type]}")

438

439

# Log stack trace for debugging

440

logger.debug(traceback.format_exc())

441

442

# Handle different error types

443

if isinstance(exc, SerializationError):

444

logger.error("Serialization error - rejecting message")

445

message.reject(requeue=False)

446

447

elif isinstance(exc, ValueError):

448

logger.warning("Business logic error - requeuing for retry")

449

message.reject(requeue=True)

450

451

elif isinstance(exc, KombuError):

452

logger.error("Kombu-specific error - investigating")

453

message.reject(requeue=False)

454

455

else:

456

logger.error("Unknown error type - rejecting without requeue")

457

message.reject(requeue=False)

458

459

def run(self):

460

"""Main consumer loop with connection error handling"""

461

while True:

462

try:

463

consumer = Consumer(

464

self.connection.channel(),

465

self.queues,

466

callbacks=[self.process_message]

467

)

468

469

consumer.consume()

470

471

while True:

472

self.connection.drain_events(timeout=1.0)

473

474

except OperationalError as e:

475

logger.error(f"Connection error: {e}")

476

logger.info("Attempting to reconnect...")

477

time.sleep(5)

478

continue

479

480

except KeyboardInterrupt:

481

logger.info("Shutting down consumer...")

482

break

483

484

except Exception as e:

485

logger.error(f"Unexpected error: {e}")

486

logger.error(traceback.format_exc())

487

break

488

489

# Print final error summary

490

if self.error_counts:

491

logger.info("Error summary:")

492

for error_type, count in self.error_counts.items():

493

logger.info(f" {error_type}: {count} occurrences")

494

495

# Usage

496

if __name__ == '__main__':

497

with Connection('redis://localhost:6379/0') as conn:

498

queue = Queue('error_tracking_queue')

499

consumer = ErrorTrackingConsumer(conn, [queue])

500

consumer.run()

501

```

502

503

### Custom Exception Handling

504

505

```python

506

from kombu.exceptions import KombuError

507

508

class CustomProcessingError(KombuError):

509

"""Custom error for application-specific failures"""

510

def __init__(self, message, error_code=None, retry_after=None):

511

super().__init__(message)

512

self.error_code = error_code

513

self.retry_after = retry_after

514

515

class DataValidationError(CustomProcessingError):

516

"""Error for data validation failures"""

517

pass

518

519

class ExternalServiceError(CustomProcessingError):

520

"""Error for external service failures"""

521

pass

522

523

def process_with_custom_errors(body, message):

524

"""Process message with custom error types"""

525

try:

526

# Validate data

527

if not body.get('user_id'):

528

raise DataValidationError(

529

"Missing user_id field",

530

error_code='MISSING_USER_ID'

531

)

532

533

# Call external service

534

if body.get('external_service_down'):

535

raise ExternalServiceError(

536

"External service unavailable",

537

error_code='SERVICE_DOWN',

538

retry_after=300 # Retry after 5 minutes

539

)

540

541

# Process successfully

542

message.ack()

543

544

except DataValidationError as e:

545

logger.error(f"Validation error: {e} (code: {e.error_code})")

546

message.reject(requeue=False) # Don't retry validation errors

547

548

except ExternalServiceError as e:

549

logger.warning(f"Service error: {e} (retry after: {e.retry_after}s)")

550

message.reject(requeue=True) # Retry service errors

551

552

except CustomProcessingError as e:

553

logger.error(f"Custom processing error: {e}")

554

message.reject(requeue=False)

555

556

except Exception as e:

557

logger.error(f"Unexpected error: {e}")

558

message.reject(requeue=False)

559

```

560

561

### Exception Context and Debugging

562

563

```python

564

from kombu import Connection

565

from kombu.exceptions import KombuError

566

import sys

567

568

def debug_kombu_exceptions():

569

"""Demonstrate exception information and debugging"""

570

571

try:

572

# Simulate various Kombu errors

573

conn = Connection('invalid://broker:9999')

574

conn.connect()

575

576

except KombuError as e:

577

print("Kombu Exception Details:")

578

print(f" Type: {type(e).__name__}")

579

print(f" Message: {str(e)}")

580

print(f" Module: {e.__class__.__module__}")

581

582

# Print exception hierarchy

583

print(" Exception hierarchy:")

584

for cls in type(e).__mro__:

585

if cls == object:

586

break

587

print(f" {cls.__name__}")

588

589

# Print traceback for debugging

590

import traceback

591

print(" Traceback:")

592

traceback.print_exc()

593

594

except Exception as e:

595

print(f"Non-Kombu exception: {type(e).__name__}: {e}")

596

597

debug_kombu_exceptions()

598

```