0
# Error Handling
1
2
Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.
3
4
## Capabilities
5
6
### Base Exception Classes
7
8
Foundation exception classes providing error categorization and retry logic.
9
10
```python { .api }
11
class KafkaError(Exception):
12
"""
13
Base exception for all Kafka-related errors.
14
15
Attributes:
16
- retriable: bool, whether error is retriable
17
- invalid_metadata: bool, whether error invalidates metadata
18
"""
19
retriable = False
20
invalid_metadata = False
21
22
def __init__(self, *args):
23
super(KafkaError, self).__init__(*args)
24
25
class BrokerResponseError(KafkaError):
26
"""
27
Base class for errors returned by Kafka brokers.
28
29
Attributes:
30
- errno: int, Kafka error code
31
- message: str, error message
32
- description: str, detailed error description
33
"""
34
errno = None
35
message = None
36
description = None
37
38
class AuthorizationError(KafkaError):
39
"""Base class for authorization-related errors."""
40
pass
41
```
42
43
### Client-Side Errors
44
45
Errors originating from the client library itself, typically related to configuration, connection, or protocol issues.
46
47
```python { .api }
48
class KafkaConfigurationError(KafkaError):
49
"""Configuration parameter errors."""
50
pass
51
52
class KafkaConnectionError(KafkaError):
53
"""
54
Connection-related errors.
55
56
Attributes:
57
- retriable = True
58
"""
59
retriable = True
60
61
class KafkaProtocolError(KafkaError):
62
"""
63
Protocol-related errors.
64
65
Attributes:
66
- retriable = True
67
"""
68
retriable = True
69
70
class KafkaTimeoutError(KafkaError):
71
"""
72
Request timeout errors.
73
74
Attributes:
75
- retriable = True
76
"""
77
retriable = True
78
79
class IllegalArgumentError(KafkaError):
80
"""Invalid argument errors."""
81
pass
82
83
class IllegalStateError(KafkaError):
84
"""Invalid state errors."""
85
pass
86
87
class IncompatibleBrokerVersion(KafkaError):
88
"""Broker version compatibility errors."""
89
pass
90
91
class MetadataEmptyBrokerList(KafkaError):
92
"""
93
No brokers available for metadata.
94
95
Attributes:
96
- retriable = True
97
- invalid_metadata = True
98
"""
99
retriable = True
100
invalid_metadata = True
101
102
class NoBrokersAvailable(KafkaError):
103
"""
104
No brokers reachable.
105
106
Attributes:
107
- retriable = True
108
"""
109
retriable = True
110
111
class NoOffsetForPartitionError(KafkaError):
112
"""No offset available for partition."""
113
pass
114
115
class NodeNotReadyError(KafkaError):
116
"""
117
Node not ready for requests.
118
119
Attributes:
120
- retriable = True
121
"""
122
retriable = True
123
124
class QuotaViolationError(KafkaError):
125
"""Rate limit exceeded."""
126
pass
127
128
class StaleMetadata(KafkaError):
129
"""
130
Metadata needs refresh.
131
132
Attributes:
133
- retriable = True
134
- invalid_metadata = True
135
"""
136
retriable = True
137
invalid_metadata = True
138
139
class TooManyInFlightRequests(KafkaError):
140
"""
141
Request queue full.
142
143
Attributes:
144
- retriable = True
145
"""
146
retriable = True
147
148
class UnrecognizedBrokerVersion(KafkaError):
149
"""Unknown broker version."""
150
pass
151
152
class UnsupportedCodecError(KafkaError):
153
"""Unsupported compression codec."""
154
pass
155
```
156
157
### Authorization Errors
158
159
Errors related to access control and authentication failures.
160
161
```python { .api }
162
class TopicAuthorizationFailedError(AuthorizationError):
163
"""
164
Topic access denied.
165
166
Attributes:
167
- errno = 29
168
- message = 'TOPIC_AUTHORIZATION_FAILED'
169
- description = 'Not authorized to access topics'
170
"""
171
errno = 29
172
message = 'TOPIC_AUTHORIZATION_FAILED'
173
description = 'Not authorized to access topics'
174
175
class GroupAuthorizationFailedError(AuthorizationError):
176
"""
177
Consumer group access denied.
178
179
Attributes:
180
- errno = 30
181
- message = 'GROUP_AUTHORIZATION_FAILED'
182
- description = 'Not authorized to access group'
183
"""
184
errno = 30
185
message = 'GROUP_AUTHORIZATION_FAILED'
186
description = 'Not authorized to access group'
187
188
class ClusterAuthorizationFailedError(AuthorizationError):
189
"""
190
Cluster access denied.
191
192
Attributes:
193
- errno = 31
194
- message = 'CLUSTER_AUTHORIZATION_FAILED'
195
- description = 'Cluster authorization failed'
196
"""
197
errno = 31
198
message = 'CLUSTER_AUTHORIZATION_FAILED'
199
description = 'Cluster authorization failed'
200
201
class TransactionalIdAuthorizationFailedError(AuthorizationError):
202
"""
203
Transactional ID access denied.
204
205
Attributes:
206
- errno = 53
207
- message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
208
- description = 'The transactional id authorization failed'
209
"""
210
errno = 53
211
message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
212
description = 'The transactional id authorization failed'
213
214
class DelegationTokenAuthorizationFailedError(AuthorizationError):
215
"""
216
Delegation token access denied.
217
218
Attributes:
219
- errno = 58
220
- message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
221
- description = 'Delegation Token authorization failed'
222
"""
223
errno = 58
224
message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
225
description = 'Delegation Token authorization failed'
226
```
227
228
### Common Broker Response Errors
229
230
Most frequently encountered broker-side errors with their error codes and retry semantics.
231
232
```python { .api }
233
class OffsetOutOfRangeError(BrokerResponseError):
234
"""
235
Requested offset is out of range.
236
237
Attributes:
238
- errno = 1
239
- message = 'OFFSET_OUT_OF_RANGE'
240
- description = 'The requested offset is not within the range of offsets'
241
"""
242
errno = 1
243
message = 'OFFSET_OUT_OF_RANGE'
244
description = 'The requested offset is not within the range of offsets'
245
246
class UnknownTopicOrPartitionError(BrokerResponseError):
247
"""
248
Topic or partition does not exist.
249
250
Attributes:
251
- errno = 3
252
- message = 'UNKNOWN_TOPIC_OR_PARTITION'
253
- description = 'This server does not host this topic-partition'
254
- retriable = True
255
- invalid_metadata = True
256
"""
257
errno = 3
258
message = 'UNKNOWN_TOPIC_OR_PARTITION'
259
description = 'This server does not host this topic-partition'
260
retriable = True
261
invalid_metadata = True
262
263
class LeaderNotAvailableError(BrokerResponseError):
264
"""
265
Partition leader not available.
266
267
Attributes:
268
- errno = 5
269
- message = 'LEADER_NOT_AVAILABLE'
270
- description = 'There is no leader for this topic-partition'
271
- retriable = True
272
- invalid_metadata = True
273
"""
274
errno = 5
275
message = 'LEADER_NOT_AVAILABLE'
276
description = 'There is no leader for this topic-partition'
277
retriable = True
278
invalid_metadata = True
279
280
class NotLeaderForPartitionError(BrokerResponseError):
281
"""
282
Broker is not the leader for partition.
283
284
Attributes:
285
- errno = 6
286
- message = 'NOT_LEADER_FOR_PARTITION'
287
- description = 'This server is not the leader for that topic-partition'
288
- retriable = True
289
- invalid_metadata = True
290
"""
291
errno = 6
292
message = 'NOT_LEADER_FOR_PARTITION'
293
description = 'This server is not the leader for that topic-partition'
294
retriable = True
295
invalid_metadata = True
296
297
class RequestTimedOutError(BrokerResponseError):
298
"""
299
Request timed out.
300
301
Attributes:
302
- errno = 7
303
- message = 'REQUEST_TIMED_OUT'
304
- description = 'The request timed out'
305
- retriable = True
306
"""
307
errno = 7
308
message = 'REQUEST_TIMED_OUT'
309
description = 'The request timed out'
310
retriable = True
311
312
class BrokerNotAvailableError(BrokerResponseError):
313
"""
314
Broker not available.
315
316
Attributes:
317
- errno = 8
318
- message = 'BROKER_NOT_AVAILABLE'
319
- description = 'The broker is not available'
320
- retriable = True
321
- invalid_metadata = True
322
"""
323
errno = 8
324
message = 'BROKER_NOT_AVAILABLE'
325
description = 'The broker is not available'
326
retriable = True
327
invalid_metadata = True
328
329
class ReplicaNotAvailableError(BrokerResponseError):
330
"""
331
Replica not available.
332
333
Attributes:
334
- errno = 9
335
- message = 'REPLICA_NOT_AVAILABLE'
336
- description = 'The replica is not available for the requested topic-partition'
337
- retriable = True
338
"""
339
errno = 9
340
message = 'REPLICA_NOT_AVAILABLE'
341
description = 'The replica is not available for the requested topic-partition'
342
retriable = True
343
344
class MessageSizeTooLargeError(BrokerResponseError):
345
"""
346
Message size exceeds limits.
347
348
Attributes:
349
- errno = 10
350
- message = 'MESSAGE_TOO_LARGE'
351
- description = 'The request included a message larger than the max message size'
352
"""
353
errno = 10
354
message = 'MESSAGE_TOO_LARGE'
355
description = 'The request included a message larger than the max message size'
356
357
class TopicAlreadyExistsError(BrokerResponseError):
358
"""
359
Topic already exists.
360
361
Attributes:
362
- errno = 36
363
- message = 'TOPIC_ALREADY_EXISTS'
364
- description = 'Topic already exists'
365
"""
366
errno = 36
367
message = 'TOPIC_ALREADY_EXISTS'
368
description = 'Topic already exists'
369
370
class InvalidTopicError(BrokerResponseError):
371
"""
372
Invalid topic name.
373
374
Attributes:
375
- errno = 17
376
- message = 'INVALID_TOPIC_EXCEPTION'
377
- description = 'The request attempted to perform an operation on an invalid topic'
378
"""
379
errno = 17
380
message = 'INVALID_TOPIC_EXCEPTION'
381
description = 'The request attempted to perform an operation on an invalid topic'
382
```
383
384
### Consumer-Specific Errors
385
386
Errors specific to consumer operations and group coordination.
387
388
```python { .api }
389
class OffsetMetadataTooLargeError(BrokerResponseError):
390
"""
391
Offset metadata too large.
392
393
Attributes:
394
- errno = 12
395
- message = 'OFFSET_METADATA_TOO_LARGE'
396
- description = 'The metadata field of the offset request was too large'
397
"""
398
errno = 12
399
message = 'OFFSET_METADATA_TOO_LARGE'
400
description = 'The metadata field of the offset request was too large'
401
402
class GroupLoadInProgressError(BrokerResponseError):
403
"""
404
Consumer group loading in progress.
405
406
Attributes:
407
- errno = 14
408
- message = 'GROUP_LOAD_IN_PROGRESS'
409
- description = 'The coordinator is loading and hence can\'t process requests'
410
- retriable = True
411
"""
412
errno = 14
413
message = 'GROUP_LOAD_IN_PROGRESS'
414
description = 'The coordinator is loading and hence can\'t process requests'
415
retriable = True
416
417
class GroupCoordinatorNotAvailableError(BrokerResponseError):
418
"""
419
Group coordinator not available.
420
421
Attributes:
422
- errno = 15
423
- message = 'GROUP_COORDINATOR_NOT_AVAILABLE'
424
- description = 'The group coordinator is not available'
425
- retriable = True
426
- invalid_metadata = True
427
"""
428
errno = 15
429
message = 'GROUP_COORDINATOR_NOT_AVAILABLE'
430
description = 'The group coordinator is not available'
431
retriable = True
432
invalid_metadata = True
433
434
class NotCoordinatorForGroupError(BrokerResponseError):
435
"""
436
Broker is not coordinator for group.
437
438
Attributes:
439
- errno = 16
440
- message = 'NOT_COORDINATOR_FOR_GROUP'
441
- description = 'The broker is not the coordinator for this group'
442
- retriable = True
443
- invalid_metadata = True
444
"""
445
errno = 16
446
message = 'NOT_COORDINATOR_FOR_GROUP'
447
description = 'The broker is not the coordinator for this group'
448
retriable = True
449
invalid_metadata = True
450
451
class UnknownMemberIdError(BrokerResponseError):
452
"""
453
Unknown member ID.
454
455
Attributes:
456
- errno = 25
457
- message = 'UNKNOWN_MEMBER_ID'
458
- description = 'The member id is not in the current generation'
459
"""
460
errno = 25
461
message = 'UNKNOWN_MEMBER_ID'
462
description = 'The member id is not in the current generation'
463
464
class IllegalGenerationError(BrokerResponseError):
465
"""
466
Illegal generation ID.
467
468
Attributes:
469
- errno = 22
470
- message = 'ILLEGAL_GENERATION'
471
- description = 'Specified group generation id is not valid'
472
"""
473
errno = 22
474
message = 'ILLEGAL_GENERATION'
475
description = 'Specified group generation id is not valid'
476
477
class RebalanceInProgressError(BrokerResponseError):
478
"""
479
Consumer group rebalance in progress.
480
481
Attributes:
482
- errno = 27
483
- message = 'REBALANCE_IN_PROGRESS'
484
- description = 'The group is rebalancing, so a rejoin is needed'
485
- retriable = True
486
"""
487
errno = 27
488
message = 'REBALANCE_IN_PROGRESS'
489
description = 'The group is rebalancing, so a rejoin is needed'
490
retriable = True
491
```
492
493
### Producer-Specific Errors
494
495
Errors specific to producer operations and message publishing.
496
497
```python { .api }
498
class InvalidRequiredAcksError(BrokerResponseError):
499
"""
500
Invalid required acknowledgments.
501
502
Attributes:
503
- errno = 21
504
- message = 'INVALID_REQUIRED_ACKS'
505
- description = 'Specified required acks is invalid (must be -1, 0, or 1)'
506
"""
507
errno = 21
508
message = 'INVALID_REQUIRED_ACKS'
509
description = 'Specified required acks is invalid (must be -1, 0, or 1)'
510
511
class RecordListTooLargeError(BrokerResponseError):
512
"""
513
Record batch too large.
514
515
Attributes:
516
- errno = 18
517
- message = 'RECORD_LIST_TOO_LARGE'
518
- description = 'The request included message batch larger than the configured segment size'
519
"""
520
errno = 18
521
message = 'RECORD_LIST_TOO_LARGE'
522
description = 'The request included message batch larger than the configured segment size'
523
524
class InvalidPartitionError(BrokerResponseError):
525
"""
526
Invalid partition number.
527
528
Attributes:
529
- errno = 4
530
- message = 'INVALID_FETCH_SIZE'
531
- description = 'The message has an invalid offset'
532
"""
533
errno = 4
534
message = 'INVALID_FETCH_SIZE'
535
description = 'The message has an invalid offset'
536
537
class DuplicateSequenceNumberError(BrokerResponseError):
538
"""
539
Duplicate sequence number (idempotent producer).
540
541
Attributes:
542
- errno = 45
543
- message = 'DUPLICATE_SEQUENCE_NUMBER'
544
- description = 'A producer attempted to produce with an old sequence number'
545
"""
546
errno = 45
547
message = 'DUPLICATE_SEQUENCE_NUMBER'
548
description = 'A producer attempted to produce with an old sequence number'
549
550
class OutOfOrderSequenceNumberError(BrokerResponseError):
551
"""
552
Out of order sequence number (idempotent producer).
553
554
Attributes:
555
- errno = 46
556
- message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
557
- description = 'A producer attempted to produce with a sequence number which is not the expected next one'
558
"""
559
errno = 46
560
message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
561
description = 'A producer attempted to produce with a sequence number which is not the expected next one'
562
```
563
564
## Error Handling Patterns
565
566
### Basic Error Handling
567
568
```python
569
from kafka import KafkaProducer, KafkaConsumer
570
from kafka.errors import (KafkaError, KafkaTimeoutError, KafkaConnectionError,
571
TopicAuthorizationFailedError, MessageSizeTooLargeError)
572
573
# Producer error handling
574
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
575
576
try:
577
future = producer.send('my-topic', value=b'test message')
578
record_metadata = future.get(timeout=30)
579
print(f"Message sent successfully: offset {record_metadata.offset}")
580
581
except TopicAuthorizationFailedError:
582
print("Access denied to topic")
583
except MessageSizeTooLargeError:
584
print("Message too large for broker")
585
except KafkaTimeoutError:
586
print("Request timed out")
587
except KafkaConnectionError:
588
print("Connection failed")
589
except KafkaError as e:
590
print(f"Kafka error: {e}")
591
finally:
592
producer.close()
593
```
594
595
### Retry Logic with Exponential Backoff
596
597
```python
598
import time
599
import random
600
from kafka import KafkaProducer
601
from kafka.errors import KafkaError
602
603
def send_with_retry(producer, topic, value, max_retries=3):
604
"""Send message with exponential backoff retry logic."""
605
606
for attempt in range(max_retries + 1):
607
try:
608
future = producer.send(topic, value=value)
609
return future.get(timeout=30)
610
611
except KafkaError as e:
612
if not e.retriable or attempt == max_retries:
613
# Non-retriable error or max retries reached
614
raise e
615
616
# Calculate exponential backoff with jitter
617
backoff = (2 ** attempt) + random.uniform(0, 1)
618
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {backoff:.2f}s")
619
time.sleep(backoff)
620
621
raise KafkaError("Max retries exceeded")
622
623
# Usage
624
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
625
626
try:
627
metadata = send_with_retry(producer, 'events', b'important message')
628
print(f"Message sent successfully: {metadata}")
629
except KafkaError as e:
630
print(f"Failed to send message: {e}")
631
finally:
632
producer.close()
633
```
634
635
### Consumer Error Handling
636
637
```python
638
from kafka import KafkaConsumer
639
from kafka.errors import (ConsumerTimeoutError, OffsetOutOfRangeError,
640
GroupAuthorizationFailedError, StaleMetadata)
641
642
consumer = KafkaConsumer(
643
'my-topic',
644
bootstrap_servers=['localhost:9092'],
645
group_id='error-handling-group',
646
auto_offset_reset='earliest',
647
consumer_timeout_ms=10000
648
)
649
650
try:
651
for message in consumer:
652
try:
653
# Process message
654
process_message(message)
655
656
except Exception as e:
657
print(f"Error processing message at offset {message.offset}: {e}")
658
# Could implement dead letter queue here
659
continue
660
661
except OffsetOutOfRangeError:
662
print("Offset out of range - seeking to beginning")
663
consumer.seek_to_beginning()
664
665
except GroupAuthorizationFailedError:
666
print("Access denied to consumer group")
667
668
except StaleMetadata:
669
print("Metadata stale - will refresh automatically")
670
671
except ConsumerTimeoutError:
672
print("Consumer timeout - no messages received")
673
674
except KeyboardInterrupt:
675
print("Shutting down consumer")
676
677
finally:
678
consumer.close()
679
```
680
681
### Circuit Breaker Pattern
682
683
```python
684
import time
685
from enum import Enum
686
from kafka import KafkaProducer
687
from kafka.errors import KafkaError
688
689
class CircuitState(Enum):
690
CLOSED = "closed"
691
OPEN = "open"
692
HALF_OPEN = "half_open"
693
694
class CircuitBreaker:
695
def __init__(self, failure_threshold=5, recovery_timeout=60):
696
self.failure_threshold = failure_threshold
697
self.recovery_timeout = recovery_timeout
698
self.failure_count = 0
699
self.last_failure_time = None
700
self.state = CircuitState.CLOSED
701
702
def call(self, func, *args, **kwargs):
703
"""Execute function with circuit breaker protection."""
704
705
if self.state == CircuitState.OPEN:
706
if time.time() - self.last_failure_time > self.recovery_timeout:
707
self.state = CircuitState.HALF_OPEN
708
print("Circuit breaker transitioning to HALF_OPEN")
709
else:
710
raise Exception("Circuit breaker is OPEN")
711
712
try:
713
result = func(*args, **kwargs)
714
self._on_success()
715
return result
716
717
except Exception as e:
718
self._on_failure()
719
raise e
720
721
def _on_success(self):
722
"""Reset circuit breaker on successful call."""
723
self.failure_count = 0
724
if self.state == CircuitState.HALF_OPEN:
725
self.state = CircuitState.CLOSED
726
print("Circuit breaker reset to CLOSED")
727
728
def _on_failure(self):
729
"""Handle failure and potentially open circuit."""
730
self.failure_count += 1
731
self.last_failure_time = time.time()
732
733
if self.failure_count >= self.failure_threshold:
734
self.state = CircuitState.OPEN
735
print(f"Circuit breaker OPENED after {self.failure_count} failures")
736
737
# Usage with Kafka producer
738
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
739
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
740
741
def send_message(topic, value):
742
"""Send message through circuit breaker."""
743
future = producer.send(topic, value=value)
744
return future.get(timeout=10)
745
746
# Send messages with circuit breaker protection
747
for i in range(20):
748
try:
749
result = circuit_breaker.call(send_message, 'protected-topic', f'Message {i}'.encode())
750
print(f"Message {i} sent successfully")
751
752
except Exception as e:
753
print(f"Message {i} failed: {e}")
754
time.sleep(1) # Brief pause before next attempt
755
756
producer.close()
757
```
758
759
### Error Classification and Handling
760
761
```python
762
from kafka.errors import (KafkaError, BrokerResponseError, AuthorizationError,
763
KafkaTimeoutError, KafkaConnectionError)
764
765
def classify_and_handle_error(error):
766
"""Classify error and determine appropriate handling strategy."""
767
768
if isinstance(error, AuthorizationError):
769
return {
770
'category': 'authorization',
771
'action': 'check_credentials',
772
'retriable': False,
773
'severity': 'high'
774
}
775
776
elif isinstance(error, KafkaConnectionError):
777
return {
778
'category': 'network',
779
'action': 'retry_with_backoff',
780
'retriable': True,
781
'severity': 'medium'
782
}
783
784
elif isinstance(error, KafkaTimeoutError):
785
return {
786
'category': 'timeout',
787
'action': 'retry_with_longer_timeout',
788
'retriable': True,
789
'severity': 'low'
790
}
791
792
elif isinstance(error, BrokerResponseError):
793
if error.retriable:
794
return {
795
'category': 'broker_retriable',
796
'action': 'retry_after_delay',
797
'retriable': True,
798
'severity': 'medium'
799
}
800
else:
801
return {
802
'category': 'broker_fatal',
803
'action': 'fail_fast',
804
'retriable': False,
805
'severity': 'high'
806
}
807
808
elif isinstance(error, KafkaError):
809
return {
810
'category': 'generic_kafka',
811
'action': 'investigate',
812
'retriable': getattr(error, 'retriable', False),
813
'severity': 'medium'
814
}
815
816
else:
817
return {
818
'category': 'unknown',
819
'action': 'investigate',
820
'retriable': False,
821
'severity': 'high'
822
}
823
824
# Usage in error handler
825
def handle_kafka_error(error, operation_context):
826
"""Handle Kafka error based on classification."""
827
828
classification = classify_and_handle_error(error)
829
830
print(f"Error in {operation_context}: {error}")
831
print(f"Classification: {classification}")
832
833
if classification['severity'] == 'high':
834
# Log critical error
835
logger.critical(f"Critical Kafka error: {error}")
836
837
if classification['retriable']:
838
return True # Indicate retry should be attempted
839
else:
840
return False # Indicate failure should be reported
841
```
842
843
### Monitoring and Alerting
844
845
```python
846
import logging
847
from collections import defaultdict, deque
848
import time
849
from kafka.errors import KafkaError
850
851
class KafkaErrorMonitor:
852
"""Monitor and track Kafka errors for alerting."""
853
854
def __init__(self, window_size=300): # 5 minute window
855
self.window_size = window_size
856
self.error_counts = defaultdict(lambda: deque())
857
self.logger = logging.getLogger(__name__)
858
859
def record_error(self, error, context="unknown"):
860
"""Record error occurrence."""
861
862
error_type = type(error).__name__
863
timestamp = time.time()
864
865
# Add to sliding window
866
self.error_counts[error_type].append(timestamp)
867
868
# Remove old entries outside window
869
cutoff = timestamp - self.window_size
870
while (self.error_counts[error_type] and
871
self.error_counts[error_type][0] < cutoff):
872
self.error_counts[error_type].popleft()
873
874
# Log error
875
self.logger.error(f"Kafka error in {context}: {error}")
876
877
# Check for alert conditions
878
self._check_alert_conditions(error_type)
879
880
def _check_alert_conditions(self, error_type):
881
"""Check if error rates exceed alert thresholds."""
882
883
error_count = len(self.error_counts[error_type])
884
885
# Alert if too many errors in window
886
if error_count > 10:
887
self.logger.critical(
888
f"High error rate: {error_count} {error_type} errors "
889
f"in last {self.window_size} seconds"
890
)
891
892
# Alert for specific critical errors
893
if error_type in ['TopicAuthorizationFailedError', 'ClusterAuthorizationFailedError']:
894
self.logger.critical(f"Authorization failure: {error_type}")
895
896
def get_error_summary(self):
897
"""Get summary of recent errors."""
898
899
summary = {}
900
current_time = time.time()
901
cutoff = current_time - self.window_size
902
903
for error_type, timestamps in self.error_counts.items():
904
# Count recent errors
905
recent_count = sum(1 for ts in timestamps if ts > cutoff)
906
if recent_count > 0:
907
summary[error_type] = recent_count
908
909
return summary
910
911
# Usage
912
error_monitor = KafkaErrorMonitor()
913
914
def monitored_kafka_operation(operation_func, *args, **kwargs):
915
"""Execute Kafka operation with error monitoring."""
916
917
try:
918
return operation_func(*args, **kwargs)
919
920
except KafkaError as e:
921
error_monitor.record_error(e, context=operation_func.__name__)
922
raise e
923
924
# Example usage
925
try:
926
monitored_kafka_operation(producer.send, 'topic', b'message')
927
except KafkaError:
928
# Error already logged and monitored
929
pass
930
931
# Periodic error summary
932
print("Recent errors:", error_monitor.get_error_summary())
933
```