Confluent's Python client for Apache Kafka
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive error handling system with specific exception types for different failure modes, detailed error information, and patterns for robust Kafka applications.
Main exception class for Kafka-related errors.
class KafkaException(Exception):
def __init__(self, kafka_error):
"""
Create KafkaException.
Args:
kafka_error (KafkaError): Underlying Kafka error
"""
def args(self):
"""
Exception arguments.
Returns:
tuple: (kafka_error,)
"""Represents a Kafka error with detailed information.
class KafkaError:
def code(self):
"""
Get error code.
Returns:
int: Kafka error code
"""
def name(self):
"""
Get error name.
Returns:
str: Human-readable error name
"""
def str(self):
"""
Get error description.
Returns:
str: Detailed error description
"""
def fatal(self):
"""
Check if error is fatal.
Returns:
bool: True if error is fatal and requires client restart
"""
def retriable(self):
"""
Check if operation can be retried.
Returns:
bool: True if operation may succeed on retry
"""
def txn_requires_abort(self):
"""
Check if error requires transaction abort.
Returns:
bool: True if current transaction must be aborted
"""
def __str__(self):
"""String representation of error."""
def __bool__(self):
"""
Check if error exists.
Returns:
bool: True if this represents an actual error
"""Wraps errors that occur during message consumption.
class ConsumeError(KafkaException):
def __init__(self, kafka_error, consumer_record=None):
"""
Create ConsumeError.
Args:
kafka_error (KafkaError): Underlying Kafka error
consumer_record: Consumer record if available
"""Specific error for key deserialization failures in DeserializingConsumer.
class KeyDeserializationError(ConsumeError):
def __init__(self, exception=None, kafka_message=None):
"""
Create KeyDeserializationError.
Args:
exception (Exception, optional): Underlying deserialization exception
kafka_message (Message, optional): Original Kafka message
"""
@property
def exception(self):
"""
Underlying deserialization exception.
Returns:
Exception: Original exception that caused deserialization failure
"""
@property
def kafka_message(self):
"""
Original Kafka message.
Returns:
Message: Message that failed to deserialize
"""Specific error for value deserialization failures in DeserializingConsumer.
class ValueDeserializationError(ConsumeError):
def __init__(self, exception=None, kafka_message=None):
"""
Create ValueDeserializationError.
Args:
exception (Exception, optional): Underlying deserialization exception
kafka_message (Message, optional): Original Kafka message
"""
@property
def exception(self):
"""
Underlying deserialization exception.
Returns:
Exception: Original exception that caused deserialization failure
"""
@property
def kafka_message(self):
"""
Original Kafka message.
Returns:
Message: Message that failed to deserialize
"""Wraps errors that occur during message production.
class ProduceError(KafkaException):
def __init__(self, kafka_error, producer_record=None):
"""
Create ProduceError.
Args:
kafka_error (KafkaError): Underlying Kafka error
producer_record: Producer record if available
"""Specific error for key serialization failures in SerializingProducer.
class KeySerializationError(ProduceError):
def __init__(self, exception=None, producer_record=None):
"""
Create KeySerializationError.
Args:
exception (Exception, optional): Underlying serialization exception
producer_record: Producer record that failed to serialize
"""
@property
def exception(self):
"""
Underlying serialization exception.
Returns:
Exception: Original exception that caused serialization failure
"""Specific error for value serialization failures in SerializingProducer.
class ValueSerializationError(ProduceError):
def __init__(self, exception=None, producer_record=None):
"""
Create ValueSerializationError.
Args:
exception (Exception, optional): Underlying serialization exception
producer_record: Producer record that failed to serialize
"""
@property
def exception(self):
"""
Underlying serialization exception.
Returns:
Exception: Original exception that caused serialization failure
"""Generic serialization/deserialization error.
class SerializationError(Exception):
def __init__(self, message, inner_exception=None):
"""
Create SerializationError.
Args:
message (str): Error message
inner_exception (Exception, optional): Underlying exception
"""
@property
def inner_exception(self):
"""
Underlying exception.
Returns:
Exception: Exception that caused the serialization error
"""Schema registry specific errors.
class SchemaRegistryError(Exception):
def __init__(self, status_code, message, error_code=None):
"""
Create SchemaRegistryError.
Args:
status_code (int): HTTP status code
message (str): Error message
error_code (int, optional): Schema registry specific error code
"""
@property
def status_code(self):
"""
HTTP status code.
Returns:
int: HTTP status code from Schema Registry response
"""
@property
def error_code(self):
"""
Schema registry error code.
Returns:
int: Schema Registry specific error code or None
"""Important Kafka error codes accessible as constants.
# Partition/Topic errors
_PARTITION_EOF = -191 # Partition EOF reached
_UNKNOWN_TOPIC_OR_PARTITION = 3
_TOPIC_ALREADY_EXISTS = 36
_INVALID_TOPIC_EXCEPTION = 17
# Consumer errors
_UNKNOWN_MEMBER_ID = 25
_REBALANCE_IN_PROGRESS = 27
_OFFSET_OUT_OF_RANGE = 1
_GROUP_COORDINATOR_NOT_AVAILABLE = 15
# Producer errors
_MSG_SIZE_TOO_LARGE = 10
_RECORD_BATCH_TOO_LARGE = 18
_REQUEST_TIMED_OUT = 7
# Authentication/Authorization
_SASL_AUTHENTICATION_FAILED = 58
_TOPIC_AUTHORIZATION_FAILED = 29
_GROUP_AUTHORIZATION_FAILED = 30
# Network errors
_NETWORK_EXCEPTION = -195
_ALL_BROKERS_DOWN = -187
# Transaction errors
_INVALID_TRANSACTION_STATE = 51
_PRODUCER_FENCED = 90from confluent_kafka import Consumer, KafkaError, KafkaException
from confluent_kafka.error import ConsumeError
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['my-topic'])
def handle_consumer_errors():
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
error = msg.error()
if error.code() == KafkaError._PARTITION_EOF:
# End of partition - not really an error
print(f'Reached end of partition {msg.topic()} [{msg.partition()}]')
continue
elif error.code() == KafkaError._UNKNOWN_TOPIC_OR_PARTITION:
print(f'Unknown topic or partition: {error}')
# Could break or continue depending on requirements
break
elif error.fatal():
print(f'Fatal error: {error}')
# Fatal errors require client restart
raise KafkaException(error)
elif error.retriable():
print(f'Retriable error: {error}')
# Continue polling - error may resolve
continue
else:
print(f'Non-retriable error: {error}')
# Log and continue or break depending on error
continue
else:
# Process successful message
print(f'Message: {msg.value()}')
except KeyboardInterrupt:
print('Interrupted by user')
except KafkaException as e:
print(f'Kafka exception: {e}')
finally:
consumer.close()
handle_consumer_errors()from confluent_kafka import Producer, KafkaError
from confluent_kafka.error import ProduceError
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
"""Delivery report callback."""
if err is not None:
error = err if isinstance(err, KafkaError) else err.args[0]
if error.code() == KafkaError._MSG_SIZE_TOO_LARGE:
print(f'Message too large: {error}')
# Could split message or log error
elif error.code() == KafkaError._REQUEST_TIMED_OUT:
print(f'Request timed out: {error}')
# Could retry or log timeout
elif error.retriable():
print(f'Retriable error - will be retried: {error}')
# Producer will automatically retry
else:
print(f'Non-retriable produce error: {error}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
def produce_with_error_handling():
try:
for i in range(100):
try:
producer.produce(
'my-topic',
key=f'key-{i}',
value=f'message-{i}',
callback=delivery_report
)
# Poll for delivery callbacks
producer.poll(0)
except BufferError:
# Producer queue full - wait and retry
print('Producer queue full, waiting...')
producer.poll(1.0) # Wait for some messages to be sent
continue
except Exception as e:
print(f'Unexpected error: {e}')
continue
# Wait for all messages to be delivered
producer.flush()
except KeyboardInterrupt:
print('Interrupted by user')
finally:
# Flush remaining messages
producer.flush()
produce_with_error_handling()from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.error import KeySerializationError, ValueSerializationError
from confluent_kafka.error import KeyDeserializationError, ValueDeserializationError
class SafeSerializer(StringSerializer):
"""Serializer with error handling."""
def __call__(self, obj, ctx=None):
try:
return super().__call__(obj, ctx)
except Exception as e:
print(f'Serialization failed for {obj}: {e}')
# Could return None, empty bytes, or raise
return b'SERIALIZATION_FAILED'
def handle_serialization_errors():
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': SafeSerializer('utf_8'),
'value.serializer': SafeSerializer('utf_8')
}
producer = SerializingProducer(producer_conf)
def delivery_callback(err, msg):
if err is not None:
if isinstance(err, KeySerializationError):
print(f'Key serialization error: {err.exception}')
elif isinstance(err, ValueSerializationError):
print(f'Value serialization error: {err.exception}')
else:
print(f'Other delivery error: {err}')
# Produce with potential serialization errors
producer.produce('my-topic', key='valid-key', value=None, callback=delivery_callback)
producer.flush()
def handle_deserialization_errors():
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': StringDeserializer('utf_8'),
'group.id': 'error-handling-group',
'auto.offset.reset': 'earliest'
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
error = msg.error()
if isinstance(error, KeyDeserializationError):
print(f'Key deserialization error: {error.exception}')
# Could access original message via error.kafka_message
original_msg = error.kafka_message
print(f'Original key bytes: {original_msg.key()}')
elif isinstance(error, ValueDeserializationError):
print(f'Value deserialization error: {error.exception}')
original_msg = error.kafka_message
print(f'Original value bytes: {original_msg.value()}')
else:
print(f'Other consumer error: {error}')
# Continue processing despite deserialization errors
continue
# Process successfully deserialized message
print(f'Key: {msg.key()}, Value: {msg.value()}')
except KeyboardInterrupt:
print('Interrupted')
finally:
consumer.close()from confluent_kafka import Producer, KafkaError
def handle_transaction_errors():
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-transactional-producer',
'enable.idempotence': True
}
producer = Producer(producer_conf)
try:
producer.init_transactions()
print('Transactions initialized')
for batch in range(5):
try:
producer.begin_transaction()
print(f'Started transaction {batch}')
# Produce messages in transaction
for i in range(3):
producer.produce('transactional-topic', f'batch-{batch}-msg-{i}')
# Commit transaction
producer.commit_transaction()
print(f'Committed transaction {batch}')
except KafkaException as e:
error = e.args[0]
if error.code() == KafkaError._PRODUCER_FENCED:
print('Producer fenced - need to recreate producer')
raise
elif error.txn_requires_abort():
print(f'Transaction error requires abort: {error}')
try:
producer.abort_transaction()
print('Transaction aborted')
except Exception as abort_error:
print(f'Failed to abort transaction: {abort_error}')
raise
elif error.retriable():
print(f'Retriable transaction error: {error}')
# Could retry the transaction
try:
producer.abort_transaction()
continue # Retry the batch
except Exception:
raise
else:
print(f'Non-retriable transaction error: {error}')
producer.abort_transaction()
raise
except KeyboardInterrupt:
print('Interrupted')
try:
producer.abort_transaction()
except Exception:
pass # Already interrupted
finally:
producer.flush()from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaException
def handle_admin_errors():
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
# Create topics with error handling
new_topics = [NewTopic('test-topic-1', 3, 1), NewTopic('test-topic-2', 6, 1)]
fs = admin_client.create_topics(new_topics, request_timeout=30)
for topic, f in fs.items():
try:
f.result() # Block until operation completes
print(f'Topic {topic} created successfully')
except KafkaException as e:
error = e.args[0]
if error.code() == KafkaError._TOPIC_ALREADY_EXISTS:
print(f'Topic {topic} already exists')
# Could continue or handle as needed
elif error.code() == KafkaError._TOPIC_AUTHORIZATION_FAILED:
print(f'Authorization failed for topic {topic}')
# Handle authorization error
elif error.code() == KafkaError._REQUEST_TIMED_OUT:
print(f'Request timed out for topic {topic}')
# Could retry with longer timeout
else:
print(f'Failed to create topic {topic}: {error}')
except Exception as e:
print(f'Unexpected error creating topic {topic}: {e}')
handle_admin_errors()import logging
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.error import ConsumeError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def comprehensive_error_handling():
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'logging-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['my-topic'])
try:
while True:
try:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
error = msg.error()
# Log error details
logger.error(
f'Consumer error - Code: {error.code()}, '
f'Name: {error.name()}, '
f'Description: {error.str()}, '
f'Fatal: {error.fatal()}, '
f'Retriable: {error.retriable()}'
)
# Handle based on error characteristics
if error.fatal():
logger.critical('Fatal error - exiting')
break
elif not error.retriable():
logger.warning('Non-retriable error - skipping')
continue
else:
logger.info('Retriable error - continuing')
continue
# Process message
logger.info(f'Processing message: {msg.topic()}[{msg.partition()}]@{msg.offset()}')
# Simulate processing
try:
# Process message here
pass
except Exception as processing_error:
logger.error(f'Message processing failed: {processing_error}')
# Could skip message or handle error
continue
# Manual commit after successful processing
try:
consumer.commit(message=msg)
except KafkaException as commit_error:
logger.error(f'Commit failed: {commit_error}')
# Could retry commit or continue
except Exception as unexpected_error:
logger.error(f'Unexpected error in consumer loop: {unexpected_error}')
# Could break, continue, or re-raise depending on requirements
except KeyboardInterrupt:
logger.info('Consumer interrupted by user')
except Exception as e:
logger.critical(f'Critical consumer error: {e}')
finally:
logger.info('Closing consumer')
consumer.close()
comprehensive_error_handling()Install with Tessl CLI
npx tessl i tessl/pypi-confluent-kafka