0
# Error Handling
1
2
Comprehensive error handling system with specific exception types for different failure modes, detailed error information, and patterns for robust Kafka applications.
3
4
## Capabilities
5
6
### Core Error Classes
7
8
#### KafkaException
9
10
Main exception class for Kafka-related errors.
11
12
```python { .api }
13
class KafkaException(Exception):
14
def __init__(self, kafka_error):
15
"""
16
Create KafkaException.
17
18
Args:
19
kafka_error (KafkaError): Underlying Kafka error
20
"""
21
22
def args(self):
23
"""
24
Exception arguments.
25
26
Returns:
27
tuple: (kafka_error,)
28
"""
29
```
30
31
#### KafkaError
32
33
Represents a Kafka error with detailed information.
34
35
```python { .api }
36
class KafkaError:
37
def code(self):
38
"""
39
Get error code.
40
41
Returns:
42
int: Kafka error code
43
"""
44
45
def name(self):
46
"""
47
Get error name.
48
49
Returns:
50
str: Human-readable error name
51
"""
52
53
def str(self):
54
"""
55
Get error description.
56
57
Returns:
58
str: Detailed error description
59
"""
60
61
def fatal(self):
62
"""
63
Check if error is fatal.
64
65
Returns:
66
bool: True if error is fatal and requires client restart
67
"""
68
69
def retriable(self):
70
"""
71
Check if operation can be retried.
72
73
Returns:
74
bool: True if operation may succeed on retry
75
"""
76
77
def txn_requires_abort(self):
78
"""
79
Check if error requires transaction abort.
80
81
Returns:
82
bool: True if current transaction must be aborted
83
"""
84
85
def __str__(self):
86
"""String representation of error."""
87
88
def __bool__(self):
89
"""
90
Check if error exists.
91
92
Returns:
93
bool: True if this represents an actual error
94
"""
95
```
96
97
### Consumer Error Classes
98
99
#### ConsumeError
100
101
Wraps errors that occur during message consumption.
102
103
```python { .api }
104
class ConsumeError(KafkaException):
105
def __init__(self, kafka_error, consumer_record=None):
106
"""
107
Create ConsumeError.
108
109
Args:
110
kafka_error (KafkaError): Underlying Kafka error
111
consumer_record: Consumer record if available
112
"""
113
```
114
115
#### KeyDeserializationError
116
117
Specific error for key deserialization failures in DeserializingConsumer.
118
119
```python { .api }
120
class KeyDeserializationError(ConsumeError):
121
def __init__(self, exception=None, kafka_message=None):
122
"""
123
Create KeyDeserializationError.
124
125
Args:
126
exception (Exception, optional): Underlying deserialization exception
127
kafka_message (Message, optional): Original Kafka message
128
"""
129
130
@property
131
def exception(self):
132
"""
133
Underlying deserialization exception.
134
135
Returns:
136
Exception: Original exception that caused deserialization failure
137
"""
138
139
@property
140
def kafka_message(self):
141
"""
142
Original Kafka message.
143
144
Returns:
145
Message: Message that failed to deserialize
146
"""
147
```
148
149
#### ValueDeserializationError
150
151
Specific error for value deserialization failures in DeserializingConsumer.
152
153
```python { .api }
154
class ValueDeserializationError(ConsumeError):
155
def __init__(self, exception=None, kafka_message=None):
156
"""
157
Create ValueDeserializationError.
158
159
Args:
160
exception (Exception, optional): Underlying deserialization exception
161
kafka_message (Message, optional): Original Kafka message
162
"""
163
164
@property
165
def exception(self):
166
"""
167
Underlying deserialization exception.
168
169
Returns:
170
Exception: Original exception that caused deserialization failure
171
"""
172
173
@property
174
def kafka_message(self):
175
"""
176
Original Kafka message.
177
178
Returns:
179
Message: Message that failed to deserialize
180
"""
181
```
182
183
### Producer Error Classes
184
185
#### ProduceError
186
187
Wraps errors that occur during message production.
188
189
```python { .api }
190
class ProduceError(KafkaException):
191
def __init__(self, kafka_error, producer_record=None):
192
"""
193
Create ProduceError.
194
195
Args:
196
kafka_error (KafkaError): Underlying Kafka error
197
producer_record: Producer record if available
198
"""
199
```
200
201
#### KeySerializationError
202
203
Specific error for key serialization failures in SerializingProducer.
204
205
```python { .api }
206
class KeySerializationError(ProduceError):
207
def __init__(self, exception=None, producer_record=None):
208
"""
209
Create KeySerializationError.
210
211
Args:
212
exception (Exception, optional): Underlying serialization exception
213
producer_record: Producer record that failed to serialize
214
"""
215
216
@property
217
def exception(self):
218
"""
219
Underlying serialization exception.
220
221
Returns:
222
Exception: Original exception that caused serialization failure
223
"""
224
```
225
226
#### ValueSerializationError
227
228
Specific error for value serialization failures in SerializingProducer.
229
230
```python { .api }
231
class ValueSerializationError(ProduceError):
232
def __init__(self, exception=None, producer_record=None):
233
"""
234
Create ValueSerializationError.
235
236
Args:
237
exception (Exception, optional): Underlying serialization exception
238
producer_record: Producer record that failed to serialize
239
"""
240
241
@property
242
def exception(self):
243
"""
244
Underlying serialization exception.
245
246
Returns:
247
Exception: Original exception that caused serialization failure
248
"""
249
```
250
251
### Serialization Error Classes
252
253
#### SerializationError
254
255
Generic serialization/deserialization error.
256
257
```python { .api }
258
class SerializationError(Exception):
259
def __init__(self, message, inner_exception=None):
260
"""
261
Create SerializationError.
262
263
Args:
264
message (str): Error message
265
inner_exception (Exception, optional): Underlying exception
266
"""
267
268
@property
269
def inner_exception(self):
270
"""
271
Underlying exception.
272
273
Returns:
274
Exception: Exception that caused the serialization error
275
"""
276
```
277
278
### Schema Registry Error Classes
279
280
#### SchemaRegistryError
281
282
Schema registry specific errors.
283
284
```python { .api }
285
class SchemaRegistryError(Exception):
286
def __init__(self, status_code, message, error_code=None):
287
"""
288
Create SchemaRegistryError.
289
290
Args:
291
status_code (int): HTTP status code
292
message (str): Error message
293
error_code (int, optional): Schema registry specific error code
294
"""
295
296
@property
297
def status_code(self):
298
"""
299
HTTP status code.
300
301
Returns:
302
int: HTTP status code from Schema Registry response
303
"""
304
305
@property
306
def error_code(self):
307
"""
308
Schema registry error code.
309
310
Returns:
311
int: Schema Registry specific error code or None
312
"""
313
```
314
315
### Common Error Codes
316
317
Important Kafka error codes accessible as constants.
318
319
```python { .api }
320
# Partition/Topic errors
321
_PARTITION_EOF = -191 # Partition EOF reached
322
_UNKNOWN_TOPIC_OR_PARTITION = 3
323
_TOPIC_ALREADY_EXISTS = 36
324
_INVALID_TOPIC_EXCEPTION = 17
325
326
# Consumer errors
327
_UNKNOWN_MEMBER_ID = 25
328
_REBALANCE_IN_PROGRESS = 27
329
_OFFSET_OUT_OF_RANGE = 1
330
_GROUP_COORDINATOR_NOT_AVAILABLE = 15
331
332
# Producer errors
333
_MSG_SIZE_TOO_LARGE = 10
334
_RECORD_BATCH_TOO_LARGE = 18
335
_REQUEST_TIMED_OUT = 7
336
337
# Authentication/Authorization
338
_SASL_AUTHENTICATION_FAILED = 58
339
_TOPIC_AUTHORIZATION_FAILED = 29
340
_GROUP_AUTHORIZATION_FAILED = 30
341
342
# Network errors
343
_NETWORK_EXCEPTION = -195
344
_ALL_BROKERS_DOWN = -187
345
346
# Transaction errors
347
_INVALID_TRANSACTION_STATE = 51
348
_PRODUCER_FENCED = 90
349
```
350
351
### Error Handling Patterns
352
353
#### Basic Consumer Error Handling
354
355
```python
356
from confluent_kafka import Consumer, KafkaError, KafkaException
357
from confluent_kafka.error import ConsumeError
358
359
consumer = Consumer({
360
'bootstrap.servers': 'localhost:9092',
361
'group.id': 'my-group',
362
'auto.offset.reset': 'earliest'
363
})
364
365
consumer.subscribe(['my-topic'])
366
367
def handle_consumer_errors():
368
try:
369
while True:
370
msg = consumer.poll(timeout=1.0)
371
372
if msg is None:
373
continue
374
375
if msg.error():
376
error = msg.error()
377
378
if error.code() == KafkaError._PARTITION_EOF:
379
# End of partition - not really an error
380
print(f'Reached end of partition {msg.topic()} [{msg.partition()}]')
381
continue
382
383
elif error.code() == KafkaError._UNKNOWN_TOPIC_OR_PARTITION:
384
print(f'Unknown topic or partition: {error}')
385
# Could break or continue depending on requirements
386
break
387
388
elif error.fatal():
389
print(f'Fatal error: {error}')
390
# Fatal errors require client restart
391
raise KafkaException(error)
392
393
elif error.retriable():
394
print(f'Retriable error: {error}')
395
# Continue polling - error may resolve
396
continue
397
398
else:
399
print(f'Non-retriable error: {error}')
400
# Log and continue or break depending on error
401
continue
402
403
else:
404
# Process successful message
405
print(f'Message: {msg.value()}')
406
407
except KeyboardInterrupt:
408
print('Interrupted by user')
409
except KafkaException as e:
410
print(f'Kafka exception: {e}')
411
finally:
412
consumer.close()
413
414
handle_consumer_errors()
415
```
416
417
#### Producer Error Handling
418
419
```python
420
from confluent_kafka import Producer, KafkaError
421
from confluent_kafka.error import ProduceError
422
423
producer = Producer({'bootstrap.servers': 'localhost:9092'})
424
425
def delivery_report(err, msg):
426
"""Delivery report callback."""
427
if err is not None:
428
error = err if isinstance(err, KafkaError) else err.args[0]
429
430
if error.code() == KafkaError._MSG_SIZE_TOO_LARGE:
431
print(f'Message too large: {error}')
432
# Could split message or log error
433
434
elif error.code() == KafkaError._REQUEST_TIMED_OUT:
435
print(f'Request timed out: {error}')
436
# Could retry or log timeout
437
438
elif error.retriable():
439
print(f'Retriable error - will be retried: {error}')
440
# Producer will automatically retry
441
442
else:
443
print(f'Non-retriable produce error: {error}')
444
445
else:
446
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
447
448
def produce_with_error_handling():
449
try:
450
for i in range(100):
451
try:
452
producer.produce(
453
'my-topic',
454
key=f'key-{i}',
455
value=f'message-{i}',
456
callback=delivery_report
457
)
458
459
# Poll for delivery callbacks
460
producer.poll(0)
461
462
except BufferError:
463
# Producer queue full - wait and retry
464
print('Producer queue full, waiting...')
465
producer.poll(1.0) # Wait for some messages to be sent
466
continue
467
468
except Exception as e:
469
print(f'Unexpected error: {e}')
470
continue
471
472
# Wait for all messages to be delivered
473
producer.flush()
474
475
except KeyboardInterrupt:
476
print('Interrupted by user')
477
finally:
478
# Flush remaining messages
479
producer.flush()
480
481
produce_with_error_handling()
482
```
483
484
#### Serialization Error Handling
485
486
```python
487
from confluent_kafka import SerializingProducer, DeserializingConsumer
488
from confluent_kafka.serialization import StringSerializer, StringDeserializer
489
from confluent_kafka.error import KeySerializationError, ValueSerializationError
490
from confluent_kafka.error import KeyDeserializationError, ValueDeserializationError
491
492
class SafeSerializer(StringSerializer):
493
"""Serializer with error handling."""
494
495
def __call__(self, obj, ctx=None):
496
try:
497
return super().__call__(obj, ctx)
498
except Exception as e:
499
print(f'Serialization failed for {obj}: {e}')
500
# Could return None, empty bytes, or raise
501
return b'SERIALIZATION_FAILED'
502
503
def handle_serialization_errors():
504
producer_conf = {
505
'bootstrap.servers': 'localhost:9092',
506
'key.serializer': SafeSerializer('utf_8'),
507
'value.serializer': SafeSerializer('utf_8')
508
}
509
510
producer = SerializingProducer(producer_conf)
511
512
def delivery_callback(err, msg):
513
if err is not None:
514
if isinstance(err, KeySerializationError):
515
print(f'Key serialization error: {err.exception}')
516
elif isinstance(err, ValueSerializationError):
517
print(f'Value serialization error: {err.exception}')
518
else:
519
print(f'Other delivery error: {err}')
520
521
# Produce with potential serialization errors
522
producer.produce('my-topic', key='valid-key', value=None, callback=delivery_callback)
523
producer.flush()
524
525
def handle_deserialization_errors():
526
consumer_conf = {
527
'bootstrap.servers': 'localhost:9092',
528
'key.deserializer': StringDeserializer('utf_8'),
529
'value.deserializer': StringDeserializer('utf_8'),
530
'group.id': 'error-handling-group',
531
'auto.offset.reset': 'earliest'
532
}
533
534
consumer = DeserializingConsumer(consumer_conf)
535
consumer.subscribe(['my-topic'])
536
537
try:
538
while True:
539
msg = consumer.poll(1.0)
540
if msg is None:
541
continue
542
543
if msg.error():
544
error = msg.error()
545
546
if isinstance(error, KeyDeserializationError):
547
print(f'Key deserialization error: {error.exception}')
548
# Could access original message via error.kafka_message
549
original_msg = error.kafka_message
550
print(f'Original key bytes: {original_msg.key()}')
551
552
elif isinstance(error, ValueDeserializationError):
553
print(f'Value deserialization error: {error.exception}')
554
original_msg = error.kafka_message
555
print(f'Original value bytes: {original_msg.value()}')
556
557
else:
558
print(f'Other consumer error: {error}')
559
560
# Continue processing despite deserialization errors
561
continue
562
563
# Process successfully deserialized message
564
print(f'Key: {msg.key()}, Value: {msg.value()}')
565
566
except KeyboardInterrupt:
567
print('Interrupted')
568
finally:
569
consumer.close()
570
```
571
572
#### Transaction Error Handling
573
574
```python
575
from confluent_kafka import Producer, KafkaError
576
577
def handle_transaction_errors():
578
producer_conf = {
579
'bootstrap.servers': 'localhost:9092',
580
'transactional.id': 'my-transactional-producer',
581
'enable.idempotence': True
582
}
583
584
producer = Producer(producer_conf)
585
586
try:
587
producer.init_transactions()
588
print('Transactions initialized')
589
590
for batch in range(5):
591
try:
592
producer.begin_transaction()
593
print(f'Started transaction {batch}')
594
595
# Produce messages in transaction
596
for i in range(3):
597
producer.produce('transactional-topic', f'batch-{batch}-msg-{i}')
598
599
# Commit transaction
600
producer.commit_transaction()
601
print(f'Committed transaction {batch}')
602
603
except KafkaException as e:
604
error = e.args[0]
605
606
if error.code() == KafkaError._PRODUCER_FENCED:
607
print('Producer fenced - need to recreate producer')
608
raise
609
610
elif error.txn_requires_abort():
611
print(f'Transaction error requires abort: {error}')
612
try:
613
producer.abort_transaction()
614
print('Transaction aborted')
615
except Exception as abort_error:
616
print(f'Failed to abort transaction: {abort_error}')
617
raise
618
619
elif error.retriable():
620
print(f'Retriable transaction error: {error}')
621
# Could retry the transaction
622
try:
623
producer.abort_transaction()
624
continue # Retry the batch
625
except Exception:
626
raise
627
628
else:
629
print(f'Non-retriable transaction error: {error}')
630
producer.abort_transaction()
631
raise
632
633
except KeyboardInterrupt:
634
print('Interrupted')
635
try:
636
producer.abort_transaction()
637
except Exception:
638
pass # Already interrupted
639
640
finally:
641
producer.flush()
642
```
643
644
#### Admin Client Error Handling
645
646
```python
647
from confluent_kafka.admin import AdminClient, NewTopic
648
from confluent_kafka import KafkaException
649
650
def handle_admin_errors():
651
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
652
653
# Create topics with error handling
654
new_topics = [NewTopic('test-topic-1', 3, 1), NewTopic('test-topic-2', 6, 1)]
655
656
fs = admin_client.create_topics(new_topics, request_timeout=30)
657
658
for topic, f in fs.items():
659
try:
660
f.result() # Block until operation completes
661
print(f'Topic {topic} created successfully')
662
663
except KafkaException as e:
664
error = e.args[0]
665
666
if error.code() == KafkaError._TOPIC_ALREADY_EXISTS:
667
print(f'Topic {topic} already exists')
668
# Could continue or handle as needed
669
670
elif error.code() == KafkaError._TOPIC_AUTHORIZATION_FAILED:
671
print(f'Authorization failed for topic {topic}')
672
# Handle authorization error
673
674
elif error.code() == KafkaError._REQUEST_TIMED_OUT:
675
print(f'Request timed out for topic {topic}')
676
# Could retry with longer timeout
677
678
else:
679
print(f'Failed to create topic {topic}: {error}')
680
681
except Exception as e:
682
print(f'Unexpected error creating topic {topic}: {e}')
683
684
handle_admin_errors()
685
```
686
687
#### Comprehensive Error Logging
688
689
```python
690
import logging
691
from confluent_kafka import Consumer, KafkaError
692
from confluent_kafka.error import ConsumeError
693
694
# Configure logging
695
logging.basicConfig(level=logging.INFO)
696
logger = logging.getLogger(__name__)
697
698
def comprehensive_error_handling():
699
consumer = Consumer({
700
'bootstrap.servers': 'localhost:9092',
701
'group.id': 'logging-group',
702
'auto.offset.reset': 'earliest',
703
'enable.auto.commit': False
704
})
705
706
consumer.subscribe(['my-topic'])
707
708
try:
709
while True:
710
try:
711
msg = consumer.poll(timeout=1.0)
712
713
if msg is None:
714
continue
715
716
if msg.error():
717
error = msg.error()
718
719
# Log error details
720
logger.error(
721
f'Consumer error - Code: {error.code()}, '
722
f'Name: {error.name()}, '
723
f'Description: {error.str()}, '
724
f'Fatal: {error.fatal()}, '
725
f'Retriable: {error.retriable()}'
726
)
727
728
# Handle based on error characteristics
729
if error.fatal():
730
logger.critical('Fatal error - exiting')
731
break
732
elif not error.retriable():
733
logger.warning('Non-retriable error - skipping')
734
continue
735
else:
736
logger.info('Retriable error - continuing')
737
continue
738
739
# Process message
740
logger.info(f'Processing message: {msg.topic()}[{msg.partition()}]@{msg.offset()}')
741
742
# Simulate processing
743
try:
744
# Process message here
745
pass
746
747
except Exception as processing_error:
748
logger.error(f'Message processing failed: {processing_error}')
749
# Could skip message or handle error
750
continue
751
752
# Manual commit after successful processing
753
try:
754
consumer.commit(message=msg)
755
except KafkaException as commit_error:
756
logger.error(f'Commit failed: {commit_error}')
757
# Could retry commit or continue
758
759
except Exception as unexpected_error:
760
logger.error(f'Unexpected error in consumer loop: {unexpected_error}')
761
# Could break, continue, or re-raise depending on requirements
762
763
except KeyboardInterrupt:
764
logger.info('Consumer interrupted by user')
765
except Exception as e:
766
logger.critical(f'Critical consumer error: {e}')
767
finally:
768
logger.info('Closing consumer')
769
consumer.close()
770
771
comprehensive_error_handling()
772
```