0
# Error Handling
1
2
Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, authentication failures, and consumer group coordination problems.
3
4
## Capabilities
5
6
### Base Exception Classes
7
8
Foundation exception classes that define the error handling framework.
9
10
```python { .api }
11
class KafkaError(RuntimeError):
12
"""
13
Base exception for all Kafka-related errors.
14
15
Attributes:
16
retriable (bool): Whether the operation that caused this error can be retried
17
invalid_metadata (bool): Whether this error indicates stale metadata that should be refreshed
18
"""
19
retriable: bool = False
20
invalid_metadata: bool = False
21
22
def __str__(self):
23
"""String representation of the error."""
24
```
25
26
### Network and Connection Errors
27
28
Errors related to network connectivity and broker communication.
29
30
```python { .api }
31
class NoBrokersAvailable(KafkaError):
32
"""No Kafka brokers are available for connection."""
33
retriable: bool = True
34
invalid_metadata: bool = True
35
36
class NodeNotReadyError(KafkaError):
37
"""Broker node is not ready to accept requests."""
38
retriable: bool = True
39
40
class ConnectionError(KafkaError):
41
"""Failed to establish connection to Kafka broker."""
42
retriable: bool = True
43
44
class RequestTimedOutError(KafkaError):
45
"""Request to Kafka broker timed out."""
46
retriable: bool = True
47
48
class NetworkException(KafkaError):
49
"""Network-level communication error."""
50
retriable: bool = True
51
```
52
53
### Protocol and Message Errors
54
55
Errors related to Kafka protocol handling and message processing.
56
57
```python { .api }
58
class KafkaProtocolError(KafkaError):
59
"""Kafka protocol-level error."""
60
retriable: bool = True
61
62
class CorrelationIdError(KafkaProtocolError):
63
"""Response correlation ID does not match request."""
64
retriable: bool = True
65
66
class BufferUnderflowError(KafkaError):
67
"""Insufficient data in buffer for deserialization."""
68
69
class ChecksumError(KafkaError):
70
"""Message checksum validation failed."""
71
72
class CompressionNotSupportedError(KafkaError):
73
"""Requested compression type is not supported."""
74
75
class UnsupportedVersionError(KafkaError):
76
"""API version is not supported by broker."""
77
```
78
79
### Client State and Usage Errors
80
81
Errors related to incorrect client usage or invalid state.
82
83
```python { .api }
84
class IllegalStateError(KafkaError):
85
"""Client is in an invalid state for the requested operation."""
86
87
class IllegalArgumentError(KafkaError):
88
"""Invalid argument provided to client method."""
89
90
class InvalidTopicError(KafkaError):
91
"""Topic name is invalid or does not exist."""
92
93
class TopicAlreadyExistsError(KafkaError):
94
"""Attempted to create a topic that already exists."""
95
96
class InvalidPartitionsError(KafkaError):
97
"""Invalid partition configuration."""
98
99
class InvalidReplicationFactorError(KafkaError):
100
"""Invalid replication factor specified."""
101
```
102
103
### Producer Errors
104
105
Errors specific to producer operations.
106
107
```python { .api }
108
class TooManyInFlightRequests(KafkaError):
109
"""Too many unacknowledged requests are pending."""
110
retriable: bool = True
111
112
class MessageSizeTooLargeError(KafkaError):
113
"""Message size exceeds broker limits."""
114
115
class RecordBatchTooLargeError(KafkaError):
116
"""Record batch size exceeds broker limits."""
117
118
class InvalidRecordError(KafkaError):
119
"""Record contains invalid data."""
120
121
class RecordTooLargeError(KafkaError):
122
"""Individual record exceeds size limits."""
123
124
class UnknownTopicOrPartitionError(KafkaError):
125
"""Topic or partition does not exist."""
126
invalid_metadata: bool = True
127
128
class LeaderNotAvailableError(KafkaError):
129
"""Partition leader is not available."""
130
retriable: bool = True
131
invalid_metadata: bool = True
132
133
class NotLeaderForPartitionError(KafkaError):
134
"""Broker is not the leader for the partition."""
135
retriable: bool = True
136
invalid_metadata: bool = True
137
```
138
139
### Consumer Errors
140
141
Errors specific to consumer operations and group coordination.
142
143
```python { .api }
144
class CommitFailedError(KafkaError):
145
"""Offset commit failed due to group rebalance or other issues."""
146
147
class InvalidSessionTimeoutError(KafkaError):
148
"""Session timeout is outside broker's allowed range."""
149
150
class InvalidGroupIdError(KafkaError):
151
"""Consumer group ID is invalid."""
152
153
class GroupLoadInProgressError(KafkaError):
154
"""Consumer group is still loading."""
155
retriable: bool = True
156
157
class GroupCoordinatorNotAvailableError(KafkaError):
158
"""Group coordinator is not available."""
159
retriable: bool = True
160
161
class NotCoordinatorForGroupError(KafkaError):
162
"""Broker is not the coordinator for this group."""
163
retriable: bool = True
164
165
class UnknownMemberIdError(KafkaError):
166
"""Consumer group member ID is not recognized."""
167
168
class IllegalGenerationError(KafkaError):
169
"""Consumer group generation ID is invalid."""
170
171
class OffsetOutOfRangeError(KafkaError):
172
"""Requested offset is outside the available range."""
173
174
class GroupAuthorizationFailedError(KafkaError):
175
"""Not authorized to access consumer group."""
176
177
class TopicAuthorizationFailedError(KafkaError):
178
"""Not authorized to access topic."""
179
```
180
181
### Authentication and Authorization Errors
182
183
Errors related to security and access control.
184
185
```python { .api }
186
class AuthenticationFailedError(KafkaError):
187
"""SASL authentication failed."""
188
189
class AuthenticationMethodNotSupported(KafkaError):
190
"""Requested SASL mechanism is not supported."""
191
192
class SaslAuthenticationError(KafkaError):
193
"""SASL authentication error."""
194
195
class ClusterAuthorizationFailedError(KafkaError):
196
"""Not authorized to perform cluster operations."""
197
198
class DelegationTokenNotFoundError(KafkaError):
199
"""Delegation token not found."""
200
201
class DelegationTokenAuthorizationFailedError(KafkaError):
202
"""Not authorized to use delegation token."""
203
```
204
205
### Metadata and Coordination Errors
206
207
Errors related to metadata management and cluster coordination.
208
209
```python { .api }
210
class StaleMetadata(KafkaError):
211
"""Client metadata is stale and needs refresh."""
212
retriable: bool = True
213
invalid_metadata: bool = True
214
215
class MetadataEmptyBrokerList(KafkaError):
216
"""Broker list in metadata is empty."""
217
retriable: bool = True
218
219
class UnrecognizedBrokerVersion(KafkaError):
220
"""Broker version is not recognized."""
221
222
class IncompatibleBrokerVersion(KafkaError):
223
"""Broker version is incompatible with client."""
224
225
class Cancelled(KafkaError):
226
"""Operation was cancelled."""
227
retriable: bool = True
228
```
229
230
### Protocol-Specific Errors
231
232
Errors returned by specific Kafka protocol APIs.
233
234
```python { .api }
235
class BrokerResponseError(KafkaError):
236
"""
237
Base class for errors returned by Kafka brokers.
238
239
Attributes:
240
errno (int): Kafka error code
241
message (str): Error message
242
description (str): Error description
243
"""
244
def __init__(self, errno, message=None, description=None):
245
self.errno = errno
246
self.message = message
247
self.description = description
248
249
errno: int
250
message: str
251
description: str
252
253
# Specific broker errors (partial list)
254
class UnknownError(BrokerResponseError):
255
"""Unknown server error."""
256
errno = -1
257
258
class OffsetMetadataTooLarge(BrokerResponseError):
259
"""Offset metadata string is too large."""
260
errno = 12
261
262
class InvalidTopicException(BrokerResponseError):
263
"""Topic name is invalid."""
264
errno = 17
265
266
class RecordListTooLarge(BrokerResponseError):
267
"""Record list is too large."""
268
errno = 18
269
270
class NotEnoughReplicas(BrokerResponseError):
271
"""Not enough replicas available."""
272
errno = 19
273
274
class NotEnoughReplicasAfterAppend(BrokerResponseError):
275
"""Not enough replicas after append."""
276
errno = 20
277
```
278
279
## Usage Examples
280
281
### Basic Error Handling
282
283
```python
284
from kafka import KafkaProducer, KafkaConsumer
285
from kafka.errors import KafkaError, NoBrokersAvailable, RequestTimedOutError
286
287
try:
288
producer = KafkaProducer(bootstrap_servers=['invalid-broker:9092'])
289
producer.send('my-topic', b'test message').get(timeout=10)
290
291
except NoBrokersAvailable:
292
print("No Kafka brokers are available")
293
294
except RequestTimedOutError:
295
print("Request timed out")
296
297
except KafkaError as e:
298
print(f"Kafka error: {e}")
299
300
finally:
301
if 'producer' in locals():
302
producer.close()
303
```
304
305
### Producer Error Handling with Retries
306
307
```python
308
from kafka import KafkaProducer
309
from kafka.errors import (KafkaError, MessageSizeTooLargeError,
310
NotLeaderForPartitionError, RequestTimedOutError)
311
import time
312
313
producer = KafkaProducer(
314
bootstrap_servers=['localhost:9092'],
315
retries=5, # Built-in retries for retriable errors
316
retry_backoff_ms=1000
317
)
318
319
def send_with_retry(topic, message, max_retries=3):
320
for attempt in range(max_retries):
321
try:
322
future = producer.send(topic, message)
323
metadata = future.get(timeout=10)
324
print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
325
return metadata
326
327
except NotLeaderForPartitionError:
328
print(f"Leader not available, attempt {attempt + 1}")
329
if attempt < max_retries - 1:
330
time.sleep(2 ** attempt) # Exponential backoff
331
continue
332
333
except MessageSizeTooLargeError:
334
print("Message is too large, cannot retry")
335
raise
336
337
except RequestTimedOutError:
338
print(f"Request timed out, attempt {attempt + 1}")
339
if attempt < max_retries - 1:
340
time.sleep(1)
341
continue
342
343
raise KafkaError("Failed to send after all retries")
344
345
# Use the retry function
346
try:
347
send_with_retry('my-topic', b'important message')
348
except KafkaError as e:
349
print(f"Final error: {e}")
350
```
351
352
### Consumer Error Handling
353
354
```python
355
from kafka import KafkaConsumer
356
from kafka.errors import (CommitFailedError, OffsetOutOfRangeError,
357
GroupCoordinatorNotAvailableError, KafkaError)
358
359
consumer = KafkaConsumer(
360
'my-topic',
361
bootstrap_servers=['localhost:9092'],
362
group_id='my-group',
363
enable_auto_commit=False
364
)
365
366
try:
367
for message in consumer:
368
try:
369
# Process message
370
process_message(message)
371
372
# Manual commit
373
consumer.commit()
374
375
except CommitFailedError:
376
print("Commit failed, likely due to rebalance")
377
# Consumer will rejoin group and get new assignment
378
379
except OffsetOutOfRangeError as e:
380
print(f"Offset out of range: {e}")
381
# Seek to beginning or end
382
consumer.seek_to_beginning()
383
384
except Exception as e:
385
print(f"Processing error: {e}")
386
# Continue with next message
387
388
except GroupCoordinatorNotAvailableError:
389
print("Group coordinator not available")
390
391
except KafkaError as e:
392
print(f"Kafka error: {e}")
393
394
finally:
395
consumer.close()
396
397
def process_message(message):
398
# Simulate message processing
399
print(f"Processing: {message.value}")
400
```
401
402
### Authentication Error Handling
403
404
```python
405
from kafka import KafkaProducer
406
from kafka.errors import (AuthenticationFailedError,
407
AuthenticationMethodNotSupported,
408
ClusterAuthorizationFailedError)
409
410
try:
411
producer = KafkaProducer(
412
bootstrap_servers=['secure-broker:9093'],
413
security_protocol='SASL_SSL',
414
sasl_mechanism='SCRAM-SHA-256',
415
sasl_plain_username='wrong-user',
416
sasl_plain_password='wrong-password'
417
)
418
419
producer.send('secure-topic', b'message')
420
421
except AuthenticationFailedError:
422
print("SASL authentication failed - check credentials")
423
424
except AuthenticationMethodNotSupported:
425
print("SASL mechanism not supported by broker")
426
427
except ClusterAuthorizationFailedError:
428
print("Not authorized to access cluster")
429
430
except Exception as e:
431
print(f"Other error: {e}")
432
```
433
434
### Admin Client Error Handling
435
436
```python
437
from kafka import KafkaAdminClient
438
from kafka.admin import NewTopic
439
from kafka.errors import (TopicAlreadyExistsError, InvalidReplicationFactorError,
440
ClusterAuthorizationFailedError, KafkaError)
441
442
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
443
444
try:
445
topics = [NewTopic('test-topic', 3, 2)]
446
result = admin.create_topics(topics)
447
448
# Check individual topic results
449
for topic_name, error in result.items():
450
if error is None:
451
print(f"Topic {topic_name} created successfully")
452
else:
453
print(f"Failed to create topic {topic_name}: {error}")
454
455
except TopicAlreadyExistsError:
456
print("Topic already exists")
457
458
except InvalidReplicationFactorError:
459
print("Invalid replication factor")
460
461
except ClusterAuthorizationFailedError:
462
print("Not authorized to create topics")
463
464
except KafkaError as e:
465
print(f"Admin operation failed: {e}")
466
467
finally:
468
admin.close()
469
```
470
471
### Error Classification and Recovery
472
473
```python
474
from kafka.errors import KafkaError
475
476
def handle_kafka_error(error):
477
"""Handle Kafka errors with appropriate recovery strategies."""
478
479
if hasattr(error, 'retriable') and error.retriable:
480
print(f"Retriable error: {error}")
481
return 'retry'
482
483
if hasattr(error, 'invalid_metadata') and error.invalid_metadata:
484
print(f"Metadata refresh needed: {error}")
485
return 'refresh_metadata'
486
487
# Non-retriable errors
488
print(f"Non-retriable error: {error}")
489
return 'fail'
490
491
# Example usage
492
try:
493
# Kafka operation
494
pass
495
except KafkaError as e:
496
strategy = handle_kafka_error(e)
497
498
if strategy == 'retry':
499
# Implement retry logic
500
pass
501
elif strategy == 'refresh_metadata':
502
# Force metadata refresh
503
pass
504
else:
505
# Log error and exit
506
raise
507
```