Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, authentication failures, and consumer group coordination problems.
Foundation exception classes that define the error handling framework.
class KafkaError(RuntimeError):
"""
Base exception for all Kafka-related errors.
Attributes:
retriable (bool): Whether the operation that caused this error can be retried
invalid_metadata (bool): Whether this error indicates stale metadata that should be refreshed
"""
retriable: bool = False
invalid_metadata: bool = False
def __str__(self):
"""String representation of the error."""Errors related to network connectivity and broker communication.
class NoBrokersAvailable(KafkaError):
"""No Kafka brokers are available for connection."""
retriable: bool = True
invalid_metadata: bool = True
class NodeNotReadyError(KafkaError):
"""Broker node is not ready to accept requests."""
retriable: bool = True
class ConnectionError(KafkaError):
"""Failed to establish connection to Kafka broker."""
retriable: bool = True
class RequestTimedOutError(KafkaError):
"""Request to Kafka broker timed out."""
retriable: bool = True
class NetworkException(KafkaError):
"""Network-level communication error."""
retriable: bool = TrueErrors related to Kafka protocol handling and message processing.
class KafkaProtocolError(KafkaError):
"""Kafka protocol-level error."""
retriable: bool = True
class CorrelationIdError(KafkaProtocolError):
"""Response correlation ID does not match request."""
retriable: bool = True
class BufferUnderflowError(KafkaError):
"""Insufficient data in buffer for deserialization."""
class ChecksumError(KafkaError):
"""Message checksum validation failed."""
class CompressionNotSupportedError(KafkaError):
"""Requested compression type is not supported."""
class UnsupportedVersionError(KafkaError):
"""API version is not supported by broker."""Errors related to incorrect client usage or invalid state.
class IllegalStateError(KafkaError):
"""Client is in an invalid state for the requested operation."""
class IllegalArgumentError(KafkaError):
"""Invalid argument provided to client method."""
class InvalidTopicError(KafkaError):
"""Topic name is invalid or does not exist."""
class TopicAlreadyExistsError(KafkaError):
"""Attempted to create a topic that already exists."""
class InvalidPartitionsError(KafkaError):
"""Invalid partition configuration."""
class InvalidReplicationFactorError(KafkaError):
"""Invalid replication factor specified."""Errors specific to producer operations.
class TooManyInFlightRequests(KafkaError):
"""Too many unacknowledged requests are pending."""
retriable: bool = True
class MessageSizeTooLargeError(KafkaError):
"""Message size exceeds broker limits."""
class RecordBatchTooLargeError(KafkaError):
"""Record batch size exceeds broker limits."""
class InvalidRecordError(KafkaError):
"""Record contains invalid data."""
class RecordTooLargeError(KafkaError):
"""Individual record exceeds size limits."""
class UnknownTopicOrPartitionError(KafkaError):
"""Topic or partition does not exist."""
invalid_metadata: bool = True
class LeaderNotAvailableError(KafkaError):
"""Partition leader is not available."""
retriable: bool = True
invalid_metadata: bool = True
class NotLeaderForPartitionError(KafkaError):
"""Broker is not the leader for the partition."""
retriable: bool = True
invalid_metadata: bool = TrueErrors specific to consumer operations and group coordination.
class CommitFailedError(KafkaError):
"""Offset commit failed due to group rebalance or other issues."""
class InvalidSessionTimeoutError(KafkaError):
"""Session timeout is outside broker's allowed range."""
class InvalidGroupIdError(KafkaError):
"""Consumer group ID is invalid."""
class GroupLoadInProgressError(KafkaError):
"""Consumer group is still loading."""
retriable: bool = True
class GroupCoordinatorNotAvailableError(KafkaError):
"""Group coordinator is not available."""
retriable: bool = True
class NotCoordinatorForGroupError(KafkaError):
"""Broker is not the coordinator for this group."""
retriable: bool = True
class UnknownMemberIdError(KafkaError):
"""Consumer group member ID is not recognized."""
class IllegalGenerationError(KafkaError):
"""Consumer group generation ID is invalid."""
class OffsetOutOfRangeError(KafkaError):
"""Requested offset is outside the available range."""
class GroupAuthorizationFailedError(KafkaError):
"""Not authorized to access consumer group."""
class TopicAuthorizationFailedError(KafkaError):
"""Not authorized to access topic."""Errors related to security and access control.
class AuthenticationFailedError(KafkaError):
"""SASL authentication failed."""
class AuthenticationMethodNotSupported(KafkaError):
"""Requested SASL mechanism is not supported."""
class SaslAuthenticationError(KafkaError):
"""SASL authentication error."""
class ClusterAuthorizationFailedError(KafkaError):
"""Not authorized to perform cluster operations."""
class DelegationTokenNotFoundError(KafkaError):
"""Delegation token not found."""
class DelegationTokenAuthorizationFailedError(KafkaError):
"""Not authorized to use delegation token."""Errors related to metadata management and cluster coordination.
class StaleMetadata(KafkaError):
"""Client metadata is stale and needs refresh."""
retriable: bool = True
invalid_metadata: bool = True
class MetadataEmptyBrokerList(KafkaError):
"""Broker list in metadata is empty."""
retriable: bool = True
class UnrecognizedBrokerVersion(KafkaError):
"""Broker version is not recognized."""
class IncompatibleBrokerVersion(KafkaError):
"""Broker version is incompatible with client."""
class Cancelled(KafkaError):
"""Operation was cancelled."""
retriable: bool = TrueErrors returned by specific Kafka protocol APIs.
class BrokerResponseError(KafkaError):
"""
Base class for errors returned by Kafka brokers.
Attributes:
errno (int): Kafka error code
message (str): Error message
description (str): Error description
"""
def __init__(self, errno, message=None, description=None):
self.errno = errno
self.message = message
self.description = description
errno: int
message: str
description: str
# Specific broker errors (partial list)
class UnknownError(BrokerResponseError):
"""Unknown server error."""
errno = -1
class OffsetMetadataTooLarge(BrokerResponseError):
"""Offset metadata string is too large."""
errno = 12
class InvalidTopicException(BrokerResponseError):
"""Topic name is invalid."""
errno = 17
class RecordListTooLarge(BrokerResponseError):
"""Record list is too large."""
errno = 18
class NotEnoughReplicas(BrokerResponseError):
"""Not enough replicas available."""
errno = 19
class NotEnoughReplicasAfterAppend(BrokerResponseError):
"""Not enough replicas after append."""
errno = 20from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError, NoBrokersAvailable, RequestTimedOutError
try:
producer = KafkaProducer(bootstrap_servers=['invalid-broker:9092'])
producer.send('my-topic', b'test message').get(timeout=10)
except NoBrokersAvailable:
print("No Kafka brokers are available")
except RequestTimedOutError:
print("Request timed out")
except KafkaError as e:
print(f"Kafka error: {e}")
finally:
if 'producer' in locals():
producer.close()from kafka import KafkaProducer
from kafka.errors import (KafkaError, MessageSizeTooLargeError,
NotLeaderForPartitionError, RequestTimedOutError)
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
retries=5, # Built-in retries for retriable errors
retry_backoff_ms=1000
)
def send_with_retry(topic, message, max_retries=3):
for attempt in range(max_retries):
try:
future = producer.send(topic, message)
metadata = future.get(timeout=10)
print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
return metadata
except NotLeaderForPartitionError:
print(f"Leader not available, attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
except MessageSizeTooLargeError:
print("Message is too large, cannot retry")
raise
except RequestTimedOutError:
print(f"Request timed out, attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(1)
continue
raise KafkaError("Failed to send after all retries")
# Use the retry function
try:
send_with_retry('my-topic', b'important message')
except KafkaError as e:
print(f"Final error: {e}")from kafka import KafkaConsumer
from kafka.errors import (CommitFailedError, OffsetOutOfRangeError,
GroupCoordinatorNotAvailableError, KafkaError)
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
enable_auto_commit=False
)
try:
for message in consumer:
try:
# Process message
process_message(message)
# Manual commit
consumer.commit()
except CommitFailedError:
print("Commit failed, likely due to rebalance")
# Consumer will rejoin group and get new assignment
except OffsetOutOfRangeError as e:
print(f"Offset out of range: {e}")
# Seek to beginning or end
consumer.seek_to_beginning()
except Exception as e:
print(f"Processing error: {e}")
# Continue with next message
except GroupCoordinatorNotAvailableError:
print("Group coordinator not available")
except KafkaError as e:
print(f"Kafka error: {e}")
finally:
consumer.close()
def process_message(message):
# Simulate message processing
print(f"Processing: {message.value}")from kafka import KafkaProducer
from kafka.errors import (AuthenticationFailedError,
AuthenticationMethodNotSupported,
ClusterAuthorizationFailedError)
try:
producer = KafkaProducer(
bootstrap_servers=['secure-broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='wrong-user',
sasl_plain_password='wrong-password'
)
producer.send('secure-topic', b'message')
except AuthenticationFailedError:
print("SASL authentication failed - check credentials")
except AuthenticationMethodNotSupported:
print("SASL mechanism not supported by broker")
except ClusterAuthorizationFailedError:
print("Not authorized to access cluster")
except Exception as e:
print(f"Other error: {e}")from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import (TopicAlreadyExistsError, InvalidReplicationFactorError,
ClusterAuthorizationFailedError, KafkaError)
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
topics = [NewTopic('test-topic', 3, 2)]
result = admin.create_topics(topics)
# Check individual topic results
for topic_name, error in result.items():
if error is None:
print(f"Topic {topic_name} created successfully")
else:
print(f"Failed to create topic {topic_name}: {error}")
except TopicAlreadyExistsError:
print("Topic already exists")
except InvalidReplicationFactorError:
print("Invalid replication factor")
except ClusterAuthorizationFailedError:
print("Not authorized to create topics")
except KafkaError as e:
print(f"Admin operation failed: {e}")
finally:
admin.close()from kafka.errors import KafkaError
def handle_kafka_error(error):
"""Handle Kafka errors with appropriate recovery strategies."""
if hasattr(error, 'retriable') and error.retriable:
print(f"Retriable error: {error}")
return 'retry'
if hasattr(error, 'invalid_metadata') and error.invalid_metadata:
print(f"Metadata refresh needed: {error}")
return 'refresh_metadata'
# Non-retriable errors
print(f"Non-retriable error: {error}")
return 'fail'
# Example usage
try:
# Kafka operation
pass
except KafkaError as e:
strategy = handle_kafka_error(e)
if strategy == 'retry':
# Implement retry logic
pass
elif strategy == 'refresh_metadata':
# Force metadata refresh
pass
else:
# Log error and exit
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng