or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-client.mdcore-producer-consumer.mderror-handling.mdindex.mdschema-registry.mdserialization.md

error-handling.mddocs/

0

# Error Handling

1

2

Comprehensive error handling system with specific exception types for different failure modes, detailed error information, and patterns for robust Kafka applications.

3

4

## Capabilities

5

6

### Core Error Classes

7

8

#### KafkaException

9

10

Main exception class for Kafka-related errors.

11

12

```python { .api }

13

class KafkaException(Exception):

14

def __init__(self, kafka_error):

15

"""

16

Create KafkaException.

17

18

Args:

19

kafka_error (KafkaError): Underlying Kafka error

20

"""

21

22

def args(self):

23

"""

24

Exception arguments.

25

26

Returns:

27

tuple: (kafka_error,)

28

"""

29

```

30

31

#### KafkaError

32

33

Represents a Kafka error with detailed information.

34

35

```python { .api }

36

class KafkaError:

37

def code(self):

38

"""

39

Get error code.

40

41

Returns:

42

int: Kafka error code

43

"""

44

45

def name(self):

46

"""

47

Get error name.

48

49

Returns:

50

str: Human-readable error name

51

"""

52

53

def str(self):

54

"""

55

Get error description.

56

57

Returns:

58

str: Detailed error description

59

"""

60

61

def fatal(self):

62

"""

63

Check if error is fatal.

64

65

Returns:

66

bool: True if error is fatal and requires client restart

67

"""

68

69

def retriable(self):

70

"""

71

Check if operation can be retried.

72

73

Returns:

74

bool: True if operation may succeed on retry

75

"""

76

77

def txn_requires_abort(self):

78

"""

79

Check if error requires transaction abort.

80

81

Returns:

82

bool: True if current transaction must be aborted

83

"""

84

85

def __str__(self):

86

"""String representation of error."""

87

88

def __bool__(self):

89

"""

90

Check if error exists.

91

92

Returns:

93

bool: True if this represents an actual error

94

"""

95

```

96

97

### Consumer Error Classes

98

99

#### ConsumeError

100

101

Wraps errors that occur during message consumption.

102

103

```python { .api }

104

class ConsumeError(KafkaException):

105

def __init__(self, kafka_error, consumer_record=None):

106

"""

107

Create ConsumeError.

108

109

Args:

110

kafka_error (KafkaError): Underlying Kafka error

111

consumer_record: Consumer record if available

112

"""

113

```

114

115

#### KeyDeserializationError

116

117

Specific error for key deserialization failures in DeserializingConsumer.

118

119

```python { .api }

120

class KeyDeserializationError(ConsumeError):

121

def __init__(self, exception=None, kafka_message=None):

122

"""

123

Create KeyDeserializationError.

124

125

Args:

126

exception (Exception, optional): Underlying deserialization exception

127

kafka_message (Message, optional): Original Kafka message

128

"""

129

130

@property

131

def exception(self):

132

"""

133

Underlying deserialization exception.

134

135

Returns:

136

Exception: Original exception that caused deserialization failure

137

"""

138

139

@property

140

def kafka_message(self):

141

"""

142

Original Kafka message.

143

144

Returns:

145

Message: Message that failed to deserialize

146

"""

147

```

148

149

#### ValueDeserializationError

150

151

Specific error for value deserialization failures in DeserializingConsumer.

152

153

```python { .api }

154

class ValueDeserializationError(ConsumeError):

155

def __init__(self, exception=None, kafka_message=None):

156

"""

157

Create ValueDeserializationError.

158

159

Args:

160

exception (Exception, optional): Underlying deserialization exception

161

kafka_message (Message, optional): Original Kafka message

162

"""

163

164

@property

165

def exception(self):

166

"""

167

Underlying deserialization exception.

168

169

Returns:

170

Exception: Original exception that caused deserialization failure

171

"""

172

173

@property

174

def kafka_message(self):

175

"""

176

Original Kafka message.

177

178

Returns:

179

Message: Message that failed to deserialize

180

"""

181

```

182

183

### Producer Error Classes

184

185

#### ProduceError

186

187

Wraps errors that occur during message production.

188

189

```python { .api }

190

class ProduceError(KafkaException):

191

def __init__(self, kafka_error, producer_record=None):

192

"""

193

Create ProduceError.

194

195

Args:

196

kafka_error (KafkaError): Underlying Kafka error

197

producer_record: Producer record if available

198

"""

199

```

200

201

#### KeySerializationError

202

203

Specific error for key serialization failures in SerializingProducer.

204

205

```python { .api }

206

class KeySerializationError(ProduceError):

207

def __init__(self, exception=None, producer_record=None):

208

"""

209

Create KeySerializationError.

210

211

Args:

212

exception (Exception, optional): Underlying serialization exception

213

producer_record: Producer record that failed to serialize

214

"""

215

216

@property

217

def exception(self):

218

"""

219

Underlying serialization exception.

220

221

Returns:

222

Exception: Original exception that caused serialization failure

223

"""

224

```

225

226

#### ValueSerializationError

227

228

Specific error for value serialization failures in SerializingProducer.

229

230

```python { .api }

231

class ValueSerializationError(ProduceError):

232

def __init__(self, exception=None, producer_record=None):

233

"""

234

Create ValueSerializationError.

235

236

Args:

237

exception (Exception, optional): Underlying serialization exception

238

producer_record: Producer record that failed to serialize

239

"""

240

241

@property

242

def exception(self):

243

"""

244

Underlying serialization exception.

245

246

Returns:

247

Exception: Original exception that caused serialization failure

248

"""

249

```

250

251

### Serialization Error Classes

252

253

#### SerializationError

254

255

Generic serialization/deserialization error.

256

257

```python { .api }

258

class SerializationError(Exception):

259

def __init__(self, message, inner_exception=None):

260

"""

261

Create SerializationError.

262

263

Args:

264

message (str): Error message

265

inner_exception (Exception, optional): Underlying exception

266

"""

267

268

@property

269

def inner_exception(self):

270

"""

271

Underlying exception.

272

273

Returns:

274

Exception: Exception that caused the serialization error

275

"""

276

```

277

278

### Schema Registry Error Classes

279

280

#### SchemaRegistryError

281

282

Schema registry specific errors.

283

284

```python { .api }

285

class SchemaRegistryError(Exception):

286

def __init__(self, status_code, message, error_code=None):

287

"""

288

Create SchemaRegistryError.

289

290

Args:

291

status_code (int): HTTP status code

292

message (str): Error message

293

error_code (int, optional): Schema registry specific error code

294

"""

295

296

@property

297

def status_code(self):

298

"""

299

HTTP status code.

300

301

Returns:

302

int: HTTP status code from Schema Registry response

303

"""

304

305

@property

306

def error_code(self):

307

"""

308

Schema registry error code.

309

310

Returns:

311

int: Schema Registry specific error code or None

312

"""

313

```

314

315

### Common Error Codes

316

317

Important Kafka error codes accessible as constants.

318

319

```python { .api }

320

# Partition/Topic errors

321

_PARTITION_EOF = -191 # Partition EOF reached

322

_UNKNOWN_TOPIC_OR_PARTITION = 3

323

_TOPIC_ALREADY_EXISTS = 36

324

_INVALID_TOPIC_EXCEPTION = 17

325

326

# Consumer errors

327

_UNKNOWN_MEMBER_ID = 25

328

_REBALANCE_IN_PROGRESS = 27

329

_OFFSET_OUT_OF_RANGE = 1

330

_GROUP_COORDINATOR_NOT_AVAILABLE = 15

331

332

# Producer errors

333

_MSG_SIZE_TOO_LARGE = 10

334

_RECORD_BATCH_TOO_LARGE = 18

335

_REQUEST_TIMED_OUT = 7

336

337

# Authentication/Authorization

338

_SASL_AUTHENTICATION_FAILED = 58

339

_TOPIC_AUTHORIZATION_FAILED = 29

340

_GROUP_AUTHORIZATION_FAILED = 30

341

342

# Network errors

343

_NETWORK_EXCEPTION = -195

344

_ALL_BROKERS_DOWN = -187

345

346

# Transaction errors

347

_INVALID_TRANSACTION_STATE = 51

348

_PRODUCER_FENCED = 90

349

```

350

351

### Error Handling Patterns

352

353

#### Basic Consumer Error Handling

354

355

```python

356

from confluent_kafka import Consumer, KafkaError, KafkaException

357

from confluent_kafka.error import ConsumeError

358

359

consumer = Consumer({

360

'bootstrap.servers': 'localhost:9092',

361

'group.id': 'my-group',

362

'auto.offset.reset': 'earliest'

363

})

364

365

consumer.subscribe(['my-topic'])

366

367

def handle_consumer_errors():

368

try:

369

while True:

370

msg = consumer.poll(timeout=1.0)

371

372

if msg is None:

373

continue

374

375

if msg.error():

376

error = msg.error()

377

378

if error.code() == KafkaError._PARTITION_EOF:

379

# End of partition - not really an error

380

print(f'Reached end of partition {msg.topic()} [{msg.partition()}]')

381

continue

382

383

elif error.code() == KafkaError._UNKNOWN_TOPIC_OR_PARTITION:

384

print(f'Unknown topic or partition: {error}')

385

# Could break or continue depending on requirements

386

break

387

388

elif error.fatal():

389

print(f'Fatal error: {error}')

390

# Fatal errors require client restart

391

raise KafkaException(error)

392

393

elif error.retriable():

394

print(f'Retriable error: {error}')

395

# Continue polling - error may resolve

396

continue

397

398

else:

399

print(f'Non-retriable error: {error}')

400

# Log and continue or break depending on error

401

continue

402

403

else:

404

# Process successful message

405

print(f'Message: {msg.value()}')

406

407

except KeyboardInterrupt:

408

print('Interrupted by user')

409

except KafkaException as e:

410

print(f'Kafka exception: {e}')

411

finally:

412

consumer.close()

413

414

handle_consumer_errors()

415

```

416

417

#### Producer Error Handling

418

419

```python

420

from confluent_kafka import Producer, KafkaError

421

from confluent_kafka.error import ProduceError

422

423

producer = Producer({'bootstrap.servers': 'localhost:9092'})

424

425

def delivery_report(err, msg):

426

"""Delivery report callback."""

427

if err is not None:

428

error = err if isinstance(err, KafkaError) else err.args[0]

429

430

if error.code() == KafkaError._MSG_SIZE_TOO_LARGE:

431

print(f'Message too large: {error}')

432

# Could split message or log error

433

434

elif error.code() == KafkaError._REQUEST_TIMED_OUT:

435

print(f'Request timed out: {error}')

436

# Could retry or log timeout

437

438

elif error.retriable():

439

print(f'Retriable error - will be retried: {error}')

440

# Producer will automatically retry

441

442

else:

443

print(f'Non-retriable produce error: {error}')

444

445

else:

446

print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

447

448

def produce_with_error_handling():

449

try:

450

for i in range(100):

451

try:

452

producer.produce(

453

'my-topic',

454

key=f'key-{i}',

455

value=f'message-{i}',

456

callback=delivery_report

457

)

458

459

# Poll for delivery callbacks

460

producer.poll(0)

461

462

except BufferError:

463

# Producer queue full - wait and retry

464

print('Producer queue full, waiting...')

465

producer.poll(1.0) # Wait for some messages to be sent

466

continue

467

468

except Exception as e:

469

print(f'Unexpected error: {e}')

470

continue

471

472

# Wait for all messages to be delivered

473

producer.flush()

474

475

except KeyboardInterrupt:

476

print('Interrupted by user')

477

finally:

478

# Flush remaining messages

479

producer.flush()

480

481

produce_with_error_handling()

482

```

483

484

#### Serialization Error Handling

485

486

```python

487

from confluent_kafka import SerializingProducer, DeserializingConsumer

488

from confluent_kafka.serialization import StringSerializer, StringDeserializer

489

from confluent_kafka.error import KeySerializationError, ValueSerializationError

490

from confluent_kafka.error import KeyDeserializationError, ValueDeserializationError

491

492

class SafeSerializer(StringSerializer):

493

"""Serializer with error handling."""

494

495

def __call__(self, obj, ctx=None):

496

try:

497

return super().__call__(obj, ctx)

498

except Exception as e:

499

print(f'Serialization failed for {obj}: {e}')

500

# Could return None, empty bytes, or raise

501

return b'SERIALIZATION_FAILED'

502

503

def handle_serialization_errors():

504

producer_conf = {

505

'bootstrap.servers': 'localhost:9092',

506

'key.serializer': SafeSerializer('utf_8'),

507

'value.serializer': SafeSerializer('utf_8')

508

}

509

510

producer = SerializingProducer(producer_conf)

511

512

def delivery_callback(err, msg):

513

if err is not None:

514

if isinstance(err, KeySerializationError):

515

print(f'Key serialization error: {err.exception}')

516

elif isinstance(err, ValueSerializationError):

517

print(f'Value serialization error: {err.exception}')

518

else:

519

print(f'Other delivery error: {err}')

520

521

# Produce with potential serialization errors

522

producer.produce('my-topic', key='valid-key', value=None, callback=delivery_callback)

523

producer.flush()

524

525

def handle_deserialization_errors():

526

consumer_conf = {

527

'bootstrap.servers': 'localhost:9092',

528

'key.deserializer': StringDeserializer('utf_8'),

529

'value.deserializer': StringDeserializer('utf_8'),

530

'group.id': 'error-handling-group',

531

'auto.offset.reset': 'earliest'

532

}

533

534

consumer = DeserializingConsumer(consumer_conf)

535

consumer.subscribe(['my-topic'])

536

537

try:

538

while True:

539

msg = consumer.poll(1.0)

540

if msg is None:

541

continue

542

543

if msg.error():

544

error = msg.error()

545

546

if isinstance(error, KeyDeserializationError):

547

print(f'Key deserialization error: {error.exception}')

548

# Could access original message via error.kafka_message

549

original_msg = error.kafka_message

550

print(f'Original key bytes: {original_msg.key()}')

551

552

elif isinstance(error, ValueDeserializationError):

553

print(f'Value deserialization error: {error.exception}')

554

original_msg = error.kafka_message

555

print(f'Original value bytes: {original_msg.value()}')

556

557

else:

558

print(f'Other consumer error: {error}')

559

560

# Continue processing despite deserialization errors

561

continue

562

563

# Process successfully deserialized message

564

print(f'Key: {msg.key()}, Value: {msg.value()}')

565

566

except KeyboardInterrupt:

567

print('Interrupted')

568

finally:

569

consumer.close()

570

```

571

572

#### Transaction Error Handling

573

574

```python

575

from confluent_kafka import Producer, KafkaError

576

577

def handle_transaction_errors():

578

producer_conf = {

579

'bootstrap.servers': 'localhost:9092',

580

'transactional.id': 'my-transactional-producer',

581

'enable.idempotence': True

582

}

583

584

producer = Producer(producer_conf)

585

586

try:

587

producer.init_transactions()

588

print('Transactions initialized')

589

590

for batch in range(5):

591

try:

592

producer.begin_transaction()

593

print(f'Started transaction {batch}')

594

595

# Produce messages in transaction

596

for i in range(3):

597

producer.produce('transactional-topic', f'batch-{batch}-msg-{i}')

598

599

# Commit transaction

600

producer.commit_transaction()

601

print(f'Committed transaction {batch}')

602

603

except KafkaException as e:

604

error = e.args[0]

605

606

if error.code() == KafkaError._PRODUCER_FENCED:

607

print('Producer fenced - need to recreate producer')

608

raise

609

610

elif error.txn_requires_abort():

611

print(f'Transaction error requires abort: {error}')

612

try:

613

producer.abort_transaction()

614

print('Transaction aborted')

615

except Exception as abort_error:

616

print(f'Failed to abort transaction: {abort_error}')

617

raise

618

619

elif error.retriable():

620

print(f'Retriable transaction error: {error}')

621

# Could retry the transaction

622

try:

623

producer.abort_transaction()

624

continue # Retry the batch

625

except Exception:

626

raise

627

628

else:

629

print(f'Non-retriable transaction error: {error}')

630

producer.abort_transaction()

631

raise

632

633

except KeyboardInterrupt:

634

print('Interrupted')

635

try:

636

producer.abort_transaction()

637

except Exception:

638

pass # Already interrupted

639

640

finally:

641

producer.flush()

642

```

643

644

#### Admin Client Error Handling

645

646

```python

647

from confluent_kafka.admin import AdminClient, NewTopic

648

from confluent_kafka import KafkaException

649

650

def handle_admin_errors():

651

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

652

653

# Create topics with error handling

654

new_topics = [NewTopic('test-topic-1', 3, 1), NewTopic('test-topic-2', 6, 1)]

655

656

fs = admin_client.create_topics(new_topics, request_timeout=30)

657

658

for topic, f in fs.items():

659

try:

660

f.result() # Block until operation completes

661

print(f'Topic {topic} created successfully')

662

663

except KafkaException as e:

664

error = e.args[0]

665

666

if error.code() == KafkaError._TOPIC_ALREADY_EXISTS:

667

print(f'Topic {topic} already exists')

668

# Could continue or handle as needed

669

670

elif error.code() == KafkaError._TOPIC_AUTHORIZATION_FAILED:

671

print(f'Authorization failed for topic {topic}')

672

# Handle authorization error

673

674

elif error.code() == KafkaError._REQUEST_TIMED_OUT:

675

print(f'Request timed out for topic {topic}')

676

# Could retry with longer timeout

677

678

else:

679

print(f'Failed to create topic {topic}: {error}')

680

681

except Exception as e:

682

print(f'Unexpected error creating topic {topic}: {e}')

683

684

handle_admin_errors()

685

```

686

687

#### Comprehensive Error Logging

688

689

```python

690

import logging

691

from confluent_kafka import Consumer, KafkaError

692

from confluent_kafka.error import ConsumeError

693

694

# Configure logging

695

logging.basicConfig(level=logging.INFO)

696

logger = logging.getLogger(__name__)

697

698

def comprehensive_error_handling():

699

consumer = Consumer({

700

'bootstrap.servers': 'localhost:9092',

701

'group.id': 'logging-group',

702

'auto.offset.reset': 'earliest',

703

'enable.auto.commit': False

704

})

705

706

consumer.subscribe(['my-topic'])

707

708

try:

709

while True:

710

try:

711

msg = consumer.poll(timeout=1.0)

712

713

if msg is None:

714

continue

715

716

if msg.error():

717

error = msg.error()

718

719

# Log error details

720

logger.error(

721

f'Consumer error - Code: {error.code()}, '

722

f'Name: {error.name()}, '

723

f'Description: {error.str()}, '

724

f'Fatal: {error.fatal()}, '

725

f'Retriable: {error.retriable()}'

726

)

727

728

# Handle based on error characteristics

729

if error.fatal():

730

logger.critical('Fatal error - exiting')

731

break

732

elif not error.retriable():

733

logger.warning('Non-retriable error - skipping')

734

continue

735

else:

736

logger.info('Retriable error - continuing')

737

continue

738

739

# Process message

740

logger.info(f'Processing message: {msg.topic()}[{msg.partition()}]@{msg.offset()}')

741

742

# Simulate processing

743

try:

744

# Process message here

745

pass

746

747

except Exception as processing_error:

748

logger.error(f'Message processing failed: {processing_error}')

749

# Could skip message or handle error

750

continue

751

752

# Manual commit after successful processing

753

try:

754

consumer.commit(message=msg)

755

except KafkaException as commit_error:

756

logger.error(f'Commit failed: {commit_error}')

757

# Could retry commit or continue

758

759

except Exception as unexpected_error:

760

logger.error(f'Unexpected error in consumer loop: {unexpected_error}')

761

# Could break, continue, or re-raise depending on requirements

762

763

except KeyboardInterrupt:

764

logger.info('Consumer interrupted by user')

765

except Exception as e:

766

logger.critical(f'Critical consumer error: {e}')

767

finally:

768

logger.info('Closing consumer')

769

consumer.close()

770

771

comprehensive_error_handling()

772

```