Pure Python client for Apache Kafka distributed stream processing system
—
Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.
Foundation exception classes providing error categorization and retry logic.
class KafkaError(Exception):
"""
Base exception for all Kafka-related errors.
Attributes:
- retriable: bool, whether error is retriable
- invalid_metadata: bool, whether error invalidates metadata
"""
retriable = False
invalid_metadata = False
def __init__(self, *args):
super(KafkaError, self).__init__(*args)
class BrokerResponseError(KafkaError):
"""
Base class for errors returned by Kafka brokers.
Attributes:
- errno: int, Kafka error code
- message: str, error message
- description: str, detailed error description
"""
errno = None
message = None
description = None
class AuthorizationError(KafkaError):
"""Base class for authorization-related errors."""
passErrors originating from the client library itself, typically related to configuration, connection, or protocol issues.
class KafkaConfigurationError(KafkaError):
"""Configuration parameter errors."""
pass
class KafkaConnectionError(KafkaError):
"""
Connection-related errors.
Attributes:
- retriable = True
"""
retriable = True
class KafkaProtocolError(KafkaError):
"""
Protocol-related errors.
Attributes:
- retriable = True
"""
retriable = True
class KafkaTimeoutError(KafkaError):
"""
Request timeout errors.
Attributes:
- retriable = True
"""
retriable = True
class IllegalArgumentError(KafkaError):
"""Invalid argument errors."""
pass
class IllegalStateError(KafkaError):
"""Invalid state errors."""
pass
class IncompatibleBrokerVersion(KafkaError):
"""Broker version compatibility errors."""
pass
class MetadataEmptyBrokerList(KafkaError):
"""
No brokers available for metadata.
Attributes:
- retriable = True
- invalid_metadata = True
"""
retriable = True
invalid_metadata = True
class NoBrokersAvailable(KafkaError):
"""
No brokers reachable.
Attributes:
- retriable = True
"""
retriable = True
class NoOffsetForPartitionError(KafkaError):
"""No offset available for partition."""
pass
class NodeNotReadyError(KafkaError):
"""
Node not ready for requests.
Attributes:
- retriable = True
"""
retriable = True
class QuotaViolationError(KafkaError):
"""Rate limit exceeded."""
pass
class StaleMetadata(KafkaError):
"""
Metadata needs refresh.
Attributes:
- retriable = True
- invalid_metadata = True
"""
retriable = True
invalid_metadata = True
class TooManyInFlightRequests(KafkaError):
"""
Request queue full.
Attributes:
- retriable = True
"""
retriable = True
class UnrecognizedBrokerVersion(KafkaError):
"""Unknown broker version."""
pass
class UnsupportedCodecError(KafkaError):
"""Unsupported compression codec."""
passErrors related to access control and authentication failures.
class TopicAuthorizationFailedError(AuthorizationError):
"""
Topic access denied.
Attributes:
- errno = 29
- message = 'TOPIC_AUTHORIZATION_FAILED'
- description = 'Not authorized to access topics'
"""
errno = 29
message = 'TOPIC_AUTHORIZATION_FAILED'
description = 'Not authorized to access topics'
class GroupAuthorizationFailedError(AuthorizationError):
"""
Consumer group access denied.
Attributes:
- errno = 30
- message = 'GROUP_AUTHORIZATION_FAILED'
- description = 'Not authorized to access group'
"""
errno = 30
message = 'GROUP_AUTHORIZATION_FAILED'
description = 'Not authorized to access group'
class ClusterAuthorizationFailedError(AuthorizationError):
"""
Cluster access denied.
Attributes:
- errno = 31
- message = 'CLUSTER_AUTHORIZATION_FAILED'
- description = 'Cluster authorization failed'
"""
errno = 31
message = 'CLUSTER_AUTHORIZATION_FAILED'
description = 'Cluster authorization failed'
class TransactionalIdAuthorizationFailedError(AuthorizationError):
"""
Transactional ID access denied.
Attributes:
- errno = 53
- message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
- description = 'The transactional id authorization failed'
"""
errno = 53
message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
description = 'The transactional id authorization failed'
class DelegationTokenAuthorizationFailedError(AuthorizationError):
"""
Delegation token access denied.
Attributes:
- errno = 58
- message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
- description = 'Delegation Token authorization failed'
"""
errno = 58
message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
description = 'Delegation Token authorization failed'Most frequently encountered broker-side errors with their error codes and retry semantics.
class OffsetOutOfRangeError(BrokerResponseError):
"""
Requested offset is out of range.
Attributes:
- errno = 1
- message = 'OFFSET_OUT_OF_RANGE'
- description = 'The requested offset is not within the range of offsets'
"""
errno = 1
message = 'OFFSET_OUT_OF_RANGE'
description = 'The requested offset is not within the range of offsets'
class UnknownTopicOrPartitionError(BrokerResponseError):
"""
Topic or partition does not exist.
Attributes:
- errno = 3
- message = 'UNKNOWN_TOPIC_OR_PARTITION'
- description = 'This server does not host this topic-partition'
- retriable = True
- invalid_metadata = True
"""
errno = 3
message = 'UNKNOWN_TOPIC_OR_PARTITION'
description = 'This server does not host this topic-partition'
retriable = True
invalid_metadata = True
class LeaderNotAvailableError(BrokerResponseError):
"""
Partition leader not available.
Attributes:
- errno = 5
- message = 'LEADER_NOT_AVAILABLE'
- description = 'There is no leader for this topic-partition'
- retriable = True
- invalid_metadata = True
"""
errno = 5
message = 'LEADER_NOT_AVAILABLE'
description = 'There is no leader for this topic-partition'
retriable = True
invalid_metadata = True
class NotLeaderForPartitionError(BrokerResponseError):
"""
Broker is not the leader for partition.
Attributes:
- errno = 6
- message = 'NOT_LEADER_FOR_PARTITION'
- description = 'This server is not the leader for that topic-partition'
- retriable = True
- invalid_metadata = True
"""
errno = 6
message = 'NOT_LEADER_FOR_PARTITION'
description = 'This server is not the leader for that topic-partition'
retriable = True
invalid_metadata = True
class RequestTimedOutError(BrokerResponseError):
"""
Request timed out.
Attributes:
- errno = 7
- message = 'REQUEST_TIMED_OUT'
- description = 'The request timed out'
- retriable = True
"""
errno = 7
message = 'REQUEST_TIMED_OUT'
description = 'The request timed out'
retriable = True
class BrokerNotAvailableError(BrokerResponseError):
"""
Broker not available.
Attributes:
- errno = 8
- message = 'BROKER_NOT_AVAILABLE'
- description = 'The broker is not available'
- retriable = True
- invalid_metadata = True
"""
errno = 8
message = 'BROKER_NOT_AVAILABLE'
description = 'The broker is not available'
retriable = True
invalid_metadata = True
class ReplicaNotAvailableError(BrokerResponseError):
"""
Replica not available.
Attributes:
- errno = 9
- message = 'REPLICA_NOT_AVAILABLE'
- description = 'The replica is not available for the requested topic-partition'
- retriable = True
"""
errno = 9
message = 'REPLICA_NOT_AVAILABLE'
description = 'The replica is not available for the requested topic-partition'
retriable = True
class MessageSizeTooLargeError(BrokerResponseError):
"""
Message size exceeds limits.
Attributes:
- errno = 10
- message = 'MESSAGE_TOO_LARGE'
- description = 'The request included a message larger than the max message size'
"""
errno = 10
message = 'MESSAGE_TOO_LARGE'
description = 'The request included a message larger than the max message size'
class TopicAlreadyExistsError(BrokerResponseError):
"""
Topic already exists.
Attributes:
- errno = 36
- message = 'TOPIC_ALREADY_EXISTS'
- description = 'Topic already exists'
"""
errno = 36
message = 'TOPIC_ALREADY_EXISTS'
description = 'Topic already exists'
class InvalidTopicError(BrokerResponseError):
"""
Invalid topic name.
Attributes:
- errno = 17
- message = 'INVALID_TOPIC_EXCEPTION'
- description = 'The request attempted to perform an operation on an invalid topic'
"""
errno = 17
message = 'INVALID_TOPIC_EXCEPTION'
description = 'The request attempted to perform an operation on an invalid topic'Errors specific to consumer operations and group coordination.
class OffsetMetadataTooLargeError(BrokerResponseError):
"""
Offset metadata too large.
Attributes:
- errno = 12
- message = 'OFFSET_METADATA_TOO_LARGE'
- description = 'The metadata field of the offset request was too large'
"""
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
description = 'The metadata field of the offset request was too large'
class GroupLoadInProgressError(BrokerResponseError):
"""
Consumer group loading in progress.
Attributes:
- errno = 14
- message = 'GROUP_LOAD_IN_PROGRESS'
- description = 'The coordinator is loading and hence can\'t process requests'
- retriable = True
"""
errno = 14
message = 'GROUP_LOAD_IN_PROGRESS'
description = 'The coordinator is loading and hence can\'t process requests'
retriable = True
class GroupCoordinatorNotAvailableError(BrokerResponseError):
"""
Group coordinator not available.
Attributes:
- errno = 15
- message = 'GROUP_COORDINATOR_NOT_AVAILABLE'
- description = 'The group coordinator is not available'
- retriable = True
- invalid_metadata = True
"""
errno = 15
message = 'GROUP_COORDINATOR_NOT_AVAILABLE'
description = 'The group coordinator is not available'
retriable = True
invalid_metadata = True
class NotCoordinatorForGroupError(BrokerResponseError):
"""
Broker is not coordinator for group.
Attributes:
- errno = 16
- message = 'NOT_COORDINATOR_FOR_GROUP'
- description = 'The broker is not the coordinator for this group'
- retriable = True
- invalid_metadata = True
"""
errno = 16
message = 'NOT_COORDINATOR_FOR_GROUP'
description = 'The broker is not the coordinator for this group'
retriable = True
invalid_metadata = True
class UnknownMemberIdError(BrokerResponseError):
"""
Unknown member ID.
Attributes:
- errno = 25
- message = 'UNKNOWN_MEMBER_ID'
- description = 'The member id is not in the current generation'
"""
errno = 25
message = 'UNKNOWN_MEMBER_ID'
description = 'The member id is not in the current generation'
class IllegalGenerationError(BrokerResponseError):
"""
Illegal generation ID.
Attributes:
- errno = 22
- message = 'ILLEGAL_GENERATION'
- description = 'Specified group generation id is not valid'
"""
errno = 22
message = 'ILLEGAL_GENERATION'
description = 'Specified group generation id is not valid'
class RebalanceInProgressError(BrokerResponseError):
"""
Consumer group rebalance in progress.
Attributes:
- errno = 27
- message = 'REBALANCE_IN_PROGRESS'
- description = 'The group is rebalancing, so a rejoin is needed'
- retriable = True
"""
errno = 27
message = 'REBALANCE_IN_PROGRESS'
description = 'The group is rebalancing, so a rejoin is needed'
retriable = TrueErrors specific to producer operations and message publishing.
class InvalidRequiredAcksError(BrokerResponseError):
"""
Invalid required acknowledgments.
Attributes:
- errno = 21
- message = 'INVALID_REQUIRED_ACKS'
- description = 'Specified required acks is invalid (must be -1, 0, or 1)'
"""
errno = 21
message = 'INVALID_REQUIRED_ACKS'
description = 'Specified required acks is invalid (must be -1, 0, or 1)'
class RecordListTooLargeError(BrokerResponseError):
"""
Record batch too large.
Attributes:
- errno = 18
- message = 'RECORD_LIST_TOO_LARGE'
- description = 'The request included message batch larger than the configured segment size'
"""
errno = 18
message = 'RECORD_LIST_TOO_LARGE'
description = 'The request included message batch larger than the configured segment size'
class InvalidPartitionError(BrokerResponseError):
"""
Invalid partition number.
Attributes:
- errno = 4
- message = 'INVALID_FETCH_SIZE'
- description = 'The message has an invalid offset'
"""
errno = 4
message = 'INVALID_FETCH_SIZE'
description = 'The message has an invalid offset'
class DuplicateSequenceNumberError(BrokerResponseError):
"""
Duplicate sequence number (idempotent producer).
Attributes:
- errno = 45
- message = 'DUPLICATE_SEQUENCE_NUMBER'
- description = 'A producer attempted to produce with an old sequence number'
"""
errno = 45
message = 'DUPLICATE_SEQUENCE_NUMBER'
description = 'A producer attempted to produce with an old sequence number'
class OutOfOrderSequenceNumberError(BrokerResponseError):
"""
Out of order sequence number (idempotent producer).
Attributes:
- errno = 46
- message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
- description = 'A producer attempted to produce with a sequence number which is not the expected next one'
"""
errno = 46
message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
description = 'A producer attempted to produce with a sequence number which is not the expected next one'from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import (KafkaError, KafkaTimeoutError, KafkaConnectionError,
TopicAuthorizationFailedError, MessageSizeTooLargeError)
# Producer error handling
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
try:
future = producer.send('my-topic', value=b'test message')
record_metadata = future.get(timeout=30)
print(f"Message sent successfully: offset {record_metadata.offset}")
except TopicAuthorizationFailedError:
print("Access denied to topic")
except MessageSizeTooLargeError:
print("Message too large for broker")
except KafkaTimeoutError:
print("Request timed out")
except KafkaConnectionError:
print("Connection failed")
except KafkaError as e:
print(f"Kafka error: {e}")
finally:
producer.close()import time
import random
from kafka import KafkaProducer
from kafka.errors import KafkaError
def send_with_retry(producer, topic, value, max_retries=3):
"""Send message with exponential backoff retry logic."""
for attempt in range(max_retries + 1):
try:
future = producer.send(topic, value=value)
return future.get(timeout=30)
except KafkaError as e:
if not e.retriable or attempt == max_retries:
# Non-retriable error or max retries reached
raise e
# Calculate exponential backoff with jitter
backoff = (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {backoff:.2f}s")
time.sleep(backoff)
raise KafkaError("Max retries exceeded")
# Usage
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
try:
metadata = send_with_retry(producer, 'events', b'important message')
print(f"Message sent successfully: {metadata}")
except KafkaError as e:
print(f"Failed to send message: {e}")
finally:
producer.close()from kafka import KafkaConsumer
from kafka.errors import (ConsumerTimeoutError, OffsetOutOfRangeError,
GroupAuthorizationFailedError, StaleMetadata)
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='error-handling-group',
auto_offset_reset='earliest',
consumer_timeout_ms=10000
)
try:
for message in consumer:
try:
# Process message
process_message(message)
except Exception as e:
print(f"Error processing message at offset {message.offset}: {e}")
# Could implement dead letter queue here
continue
except OffsetOutOfRangeError:
print("Offset out of range - seeking to beginning")
consumer.seek_to_beginning()
except GroupAuthorizationFailedError:
print("Access denied to consumer group")
except StaleMetadata:
print("Metadata stale - will refresh automatically")
except ConsumerTimeoutError:
print("Consumer timeout - no messages received")
except KeyboardInterrupt:
print("Shutting down consumer")
finally:
consumer.close()import time
from enum import Enum
from kafka import KafkaProducer
from kafka.errors import KafkaError
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print("Circuit breaker transitioning to HALF_OPEN")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
"""Reset circuit breaker on successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
print("Circuit breaker reset to CLOSED")
def _on_failure(self):
"""Handle failure and potentially open circuit."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPENED after {self.failure_count} failures")
# Usage with Kafka producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def send_message(topic, value):
"""Send message through circuit breaker."""
future = producer.send(topic, value=value)
return future.get(timeout=10)
# Send messages with circuit breaker protection
for i in range(20):
try:
result = circuit_breaker.call(send_message, 'protected-topic', f'Message {i}'.encode())
print(f"Message {i} sent successfully")
except Exception as e:
print(f"Message {i} failed: {e}")
time.sleep(1) # Brief pause before next attempt
producer.close()from kafka.errors import (KafkaError, BrokerResponseError, AuthorizationError,
KafkaTimeoutError, KafkaConnectionError)
def classify_and_handle_error(error):
"""Classify error and determine appropriate handling strategy."""
if isinstance(error, AuthorizationError):
return {
'category': 'authorization',
'action': 'check_credentials',
'retriable': False,
'severity': 'high'
}
elif isinstance(error, KafkaConnectionError):
return {
'category': 'network',
'action': 'retry_with_backoff',
'retriable': True,
'severity': 'medium'
}
elif isinstance(error, KafkaTimeoutError):
return {
'category': 'timeout',
'action': 'retry_with_longer_timeout',
'retriable': True,
'severity': 'low'
}
elif isinstance(error, BrokerResponseError):
if error.retriable:
return {
'category': 'broker_retriable',
'action': 'retry_after_delay',
'retriable': True,
'severity': 'medium'
}
else:
return {
'category': 'broker_fatal',
'action': 'fail_fast',
'retriable': False,
'severity': 'high'
}
elif isinstance(error, KafkaError):
return {
'category': 'generic_kafka',
'action': 'investigate',
'retriable': getattr(error, 'retriable', False),
'severity': 'medium'
}
else:
return {
'category': 'unknown',
'action': 'investigate',
'retriable': False,
'severity': 'high'
}
# Usage in error handler
def handle_kafka_error(error, operation_context):
"""Handle Kafka error based on classification."""
classification = classify_and_handle_error(error)
print(f"Error in {operation_context}: {error}")
print(f"Classification: {classification}")
if classification['severity'] == 'high':
# Log critical error
logger.critical(f"Critical Kafka error: {error}")
if classification['retriable']:
return True # Indicate retry should be attempted
else:
return False # Indicate failure should be reportedimport logging
from collections import defaultdict, deque
import time
from kafka.errors import KafkaError
class KafkaErrorMonitor:
"""Monitor and track Kafka errors for alerting."""
def __init__(self, window_size=300): # 5 minute window
self.window_size = window_size
self.error_counts = defaultdict(lambda: deque())
self.logger = logging.getLogger(__name__)
def record_error(self, error, context="unknown"):
"""Record error occurrence."""
error_type = type(error).__name__
timestamp = time.time()
# Add to sliding window
self.error_counts[error_type].append(timestamp)
# Remove old entries outside window
cutoff = timestamp - self.window_size
while (self.error_counts[error_type] and
self.error_counts[error_type][0] < cutoff):
self.error_counts[error_type].popleft()
# Log error
self.logger.error(f"Kafka error in {context}: {error}")
# Check for alert conditions
self._check_alert_conditions(error_type)
def _check_alert_conditions(self, error_type):
"""Check if error rates exceed alert thresholds."""
error_count = len(self.error_counts[error_type])
# Alert if too many errors in window
if error_count > 10:
self.logger.critical(
f"High error rate: {error_count} {error_type} errors "
f"in last {self.window_size} seconds"
)
# Alert for specific critical errors
if error_type in ['TopicAuthorizationFailedError', 'ClusterAuthorizationFailedError']:
self.logger.critical(f"Authorization failure: {error_type}")
def get_error_summary(self):
"""Get summary of recent errors."""
summary = {}
current_time = time.time()
cutoff = current_time - self.window_size
for error_type, timestamps in self.error_counts.items():
# Count recent errors
recent_count = sum(1 for ts in timestamps if ts > cutoff)
if recent_count > 0:
summary[error_type] = recent_count
return summary
# Usage
error_monitor = KafkaErrorMonitor()
def monitored_kafka_operation(operation_func, *args, **kwargs):
"""Execute Kafka operation with error monitoring."""
try:
return operation_func(*args, **kwargs)
except KafkaError as e:
error_monitor.record_error(e, context=operation_func.__name__)
raise e
# Example usage
try:
monitored_kafka_operation(producer.send, 'topic', b'message')
except KafkaError:
# Error already logged and monitored
pass
# Periodic error summary
print("Recent errors:", error_monitor.get_error_summary())Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python