CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Comprehensive error handling system with specific exception types for different failure modes, detailed error information, and patterns for robust Kafka applications.

Capabilities

Core Error Classes

KafkaException

Main exception class for Kafka-related errors.

class KafkaException(Exception):
    def __init__(self, kafka_error):
        """
        Create KafkaException.
        
        Args:
            kafka_error (KafkaError): Underlying Kafka error
        """

    def args(self):
        """
        Exception arguments.
        
        Returns:
            tuple: (kafka_error,)
        """

KafkaError

Represents a Kafka error with detailed information.

class KafkaError:
    def code(self):
        """
        Get error code.
        
        Returns:
            int: Kafka error code
        """

    def name(self):
        """
        Get error name.
        
        Returns:
            str: Human-readable error name
        """

    def str(self):
        """
        Get error description.
        
        Returns:
            str: Detailed error description
        """

    def fatal(self):
        """
        Check if error is fatal.
        
        Returns:
            bool: True if error is fatal and requires client restart
        """

    def retriable(self):
        """
        Check if operation can be retried.
        
        Returns:
            bool: True if operation may succeed on retry
        """

    def txn_requires_abort(self):
        """
        Check if error requires transaction abort.
        
        Returns:
            bool: True if current transaction must be aborted
        """

    def __str__(self):
        """String representation of error."""

    def __bool__(self):
        """
        Check if error exists.
        
        Returns:
            bool: True if this represents an actual error
        """

Consumer Error Classes

ConsumeError

Wraps errors that occur during message consumption.

class ConsumeError(KafkaException):
    def __init__(self, kafka_error, consumer_record=None):
        """
        Create ConsumeError.
        
        Args:
            kafka_error (KafkaError): Underlying Kafka error
            consumer_record: Consumer record if available
        """

KeyDeserializationError

Specific error for key deserialization failures in DeserializingConsumer.

class KeyDeserializationError(ConsumeError):
    def __init__(self, exception=None, kafka_message=None):
        """
        Create KeyDeserializationError.
        
        Args:
            exception (Exception, optional): Underlying deserialization exception
            kafka_message (Message, optional): Original Kafka message
        """

    @property
    def exception(self):
        """
        Underlying deserialization exception.
        
        Returns:
            Exception: Original exception that caused deserialization failure
        """

    @property
    def kafka_message(self):
        """
        Original Kafka message.
        
        Returns:
            Message: Message that failed to deserialize
        """

ValueDeserializationError

Specific error for value deserialization failures in DeserializingConsumer.

class ValueDeserializationError(ConsumeError):
    def __init__(self, exception=None, kafka_message=None):
        """
        Create ValueDeserializationError.
        
        Args:
            exception (Exception, optional): Underlying deserialization exception
            kafka_message (Message, optional): Original Kafka message
        """

    @property
    def exception(self):
        """
        Underlying deserialization exception.
        
        Returns:
            Exception: Original exception that caused deserialization failure
        """

    @property
    def kafka_message(self):
        """
        Original Kafka message.
        
        Returns:
            Message: Message that failed to deserialize
        """

Producer Error Classes

ProduceError

Wraps errors that occur during message production.

class ProduceError(KafkaException):
    def __init__(self, kafka_error, producer_record=None):
        """
        Create ProduceError.
        
        Args:
            kafka_error (KafkaError): Underlying Kafka error
            producer_record: Producer record if available
        """

KeySerializationError

Specific error for key serialization failures in SerializingProducer.

class KeySerializationError(ProduceError):
    def __init__(self, exception=None, producer_record=None):
        """
        Create KeySerializationError.
        
        Args:
            exception (Exception, optional): Underlying serialization exception
            producer_record: Producer record that failed to serialize
        """

    @property
    def exception(self):
        """
        Underlying serialization exception.
        
        Returns:
            Exception: Original exception that caused serialization failure
        """

ValueSerializationError

Specific error for value serialization failures in SerializingProducer.

class ValueSerializationError(ProduceError):
    def __init__(self, exception=None, producer_record=None):
        """
        Create ValueSerializationError.
        
        Args:
            exception (Exception, optional): Underlying serialization exception
            producer_record: Producer record that failed to serialize
        """

    @property
    def exception(self):
        """
        Underlying serialization exception.
        
        Returns:
            Exception: Original exception that caused serialization failure
        """

Serialization Error Classes

SerializationError

Generic serialization/deserialization error.

class SerializationError(Exception):
    def __init__(self, message, inner_exception=None):
        """
        Create SerializationError.
        
        Args:
            message (str): Error message
            inner_exception (Exception, optional): Underlying exception
        """

    @property
    def inner_exception(self):
        """
        Underlying exception.
        
        Returns:
            Exception: Exception that caused the serialization error
        """

Schema Registry Error Classes

SchemaRegistryError

Schema registry specific errors.

class SchemaRegistryError(Exception):
    def __init__(self, status_code, message, error_code=None):
        """
        Create SchemaRegistryError.
        
        Args:
            status_code (int): HTTP status code
            message (str): Error message
            error_code (int, optional): Schema registry specific error code
        """

    @property
    def status_code(self):
        """
        HTTP status code.
        
        Returns:
            int: HTTP status code from Schema Registry response
        """

    @property
    def error_code(self):
        """
        Schema registry error code.
        
        Returns:
            int: Schema Registry specific error code or None
        """

Common Error Codes

Important Kafka error codes accessible as constants.

# Partition/Topic errors
_PARTITION_EOF = -191  # Partition EOF reached
_UNKNOWN_TOPIC_OR_PARTITION = 3
_TOPIC_ALREADY_EXISTS = 36
_INVALID_TOPIC_EXCEPTION = 17

# Consumer errors
_UNKNOWN_MEMBER_ID = 25
_REBALANCE_IN_PROGRESS = 27
_OFFSET_OUT_OF_RANGE = 1
_GROUP_COORDINATOR_NOT_AVAILABLE = 15

# Producer errors
_MSG_SIZE_TOO_LARGE = 10
_RECORD_BATCH_TOO_LARGE = 18
_REQUEST_TIMED_OUT = 7

# Authentication/Authorization
_SASL_AUTHENTICATION_FAILED = 58
_TOPIC_AUTHORIZATION_FAILED = 29
_GROUP_AUTHORIZATION_FAILED = 30

# Network errors
_NETWORK_EXCEPTION = -195
_ALL_BROKERS_DOWN = -187

# Transaction errors
_INVALID_TRANSACTION_STATE = 51
_PRODUCER_FENCED = 90

Error Handling Patterns

Basic Consumer Error Handling

from confluent_kafka import Consumer, KafkaError, KafkaException
from confluent_kafka.error import ConsumeError

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['my-topic'])

def handle_consumer_errors():
    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
                
            if msg.error():
                error = msg.error()
                
                if error.code() == KafkaError._PARTITION_EOF:
                    # End of partition - not really an error
                    print(f'Reached end of partition {msg.topic()} [{msg.partition()}]')
                    continue
                    
                elif error.code() == KafkaError._UNKNOWN_TOPIC_OR_PARTITION:
                    print(f'Unknown topic or partition: {error}')
                    # Could break or continue depending on requirements
                    break
                    
                elif error.fatal():
                    print(f'Fatal error: {error}')
                    # Fatal errors require client restart
                    raise KafkaException(error)
                    
                elif error.retriable():
                    print(f'Retriable error: {error}')
                    # Continue polling - error may resolve
                    continue
                    
                else:
                    print(f'Non-retriable error: {error}')
                    # Log and continue or break depending on error
                    continue
                    
            else:
                # Process successful message
                print(f'Message: {msg.value()}')
                
    except KeyboardInterrupt:
        print('Interrupted by user')
    except KafkaException as e:
        print(f'Kafka exception: {e}')
    finally:
        consumer.close()

handle_consumer_errors()

Producer Error Handling

from confluent_kafka import Producer, KafkaError
from confluent_kafka.error import ProduceError

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    """Delivery report callback."""
    if err is not None:
        error = err if isinstance(err, KafkaError) else err.args[0]
        
        if error.code() == KafkaError._MSG_SIZE_TOO_LARGE:
            print(f'Message too large: {error}')
            # Could split message or log error
            
        elif error.code() == KafkaError._REQUEST_TIMED_OUT:
            print(f'Request timed out: {error}')
            # Could retry or log timeout
            
        elif error.retriable():
            print(f'Retriable error - will be retried: {error}')
            # Producer will automatically retry
            
        else:
            print(f'Non-retriable produce error: {error}')
            
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

def produce_with_error_handling():
    try:
        for i in range(100):
            try:
                producer.produce(
                    'my-topic',
                    key=f'key-{i}',
                    value=f'message-{i}',
                    callback=delivery_report
                )
                
                # Poll for delivery callbacks
                producer.poll(0)
                
            except BufferError:
                # Producer queue full - wait and retry
                print('Producer queue full, waiting...')
                producer.poll(1.0)  # Wait for some messages to be sent
                continue
                
            except Exception as e:
                print(f'Unexpected error: {e}')
                continue
                
        # Wait for all messages to be delivered
        producer.flush()
        
    except KeyboardInterrupt:
        print('Interrupted by user')
    finally:
        # Flush remaining messages
        producer.flush()

produce_with_error_handling()

Serialization Error Handling

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.error import KeySerializationError, ValueSerializationError
from confluent_kafka.error import KeyDeserializationError, ValueDeserializationError

class SafeSerializer(StringSerializer):
    """Serializer with error handling."""
    
    def __call__(self, obj, ctx=None):
        try:
            return super().__call__(obj, ctx)
        except Exception as e:
            print(f'Serialization failed for {obj}: {e}')
            # Could return None, empty bytes, or raise
            return b'SERIALIZATION_FAILED'

def handle_serialization_errors():
    producer_conf = {
        'bootstrap.servers': 'localhost:9092',
        'key.serializer': SafeSerializer('utf_8'),
        'value.serializer': SafeSerializer('utf_8')
    }
    
    producer = SerializingProducer(producer_conf)
    
    def delivery_callback(err, msg):
        if err is not None:
            if isinstance(err, KeySerializationError):
                print(f'Key serialization error: {err.exception}')
            elif isinstance(err, ValueSerializationError):
                print(f'Value serialization error: {err.exception}')
            else:
                print(f'Other delivery error: {err}')
    
    # Produce with potential serialization errors
    producer.produce('my-topic', key='valid-key', value=None, callback=delivery_callback)
    producer.flush()

def handle_deserialization_errors():
    consumer_conf = {
        'bootstrap.servers': 'localhost:9092',
        'key.deserializer': StringDeserializer('utf_8'),
        'value.deserializer': StringDeserializer('utf_8'),
        'group.id': 'error-handling-group',
        'auto.offset.reset': 'earliest'
    }
    
    consumer = DeserializingConsumer(consumer_conf)
    consumer.subscribe(['my-topic'])
    
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
                
            if msg.error():
                error = msg.error()
                
                if isinstance(error, KeyDeserializationError):
                    print(f'Key deserialization error: {error.exception}')
                    # Could access original message via error.kafka_message
                    original_msg = error.kafka_message
                    print(f'Original key bytes: {original_msg.key()}')
                    
                elif isinstance(error, ValueDeserializationError):
                    print(f'Value deserialization error: {error.exception}')
                    original_msg = error.kafka_message
                    print(f'Original value bytes: {original_msg.value()}')
                    
                else:
                    print(f'Other consumer error: {error}')
                    
                # Continue processing despite deserialization errors
                continue
                
            # Process successfully deserialized message
            print(f'Key: {msg.key()}, Value: {msg.value()}')
            
    except KeyboardInterrupt:
        print('Interrupted')
    finally:
        consumer.close()

Transaction Error Handling

from confluent_kafka import Producer, KafkaError

def handle_transaction_errors():
    producer_conf = {
        'bootstrap.servers': 'localhost:9092',
        'transactional.id': 'my-transactional-producer',
        'enable.idempotence': True
    }
    
    producer = Producer(producer_conf)
    
    try:
        producer.init_transactions()
        print('Transactions initialized')
        
        for batch in range(5):
            try:
                producer.begin_transaction()
                print(f'Started transaction {batch}')
                
                # Produce messages in transaction
                for i in range(3):
                    producer.produce('transactional-topic', f'batch-{batch}-msg-{i}')
                
                # Commit transaction
                producer.commit_transaction()
                print(f'Committed transaction {batch}')
                
            except KafkaException as e:
                error = e.args[0]
                
                if error.code() == KafkaError._PRODUCER_FENCED:
                    print('Producer fenced - need to recreate producer')
                    raise
                    
                elif error.txn_requires_abort():
                    print(f'Transaction error requires abort: {error}')
                    try:
                        producer.abort_transaction()
                        print('Transaction aborted')
                    except Exception as abort_error:
                        print(f'Failed to abort transaction: {abort_error}')
                        raise
                        
                elif error.retriable():
                    print(f'Retriable transaction error: {error}')
                    # Could retry the transaction
                    try:
                        producer.abort_transaction()
                        continue  # Retry the batch
                    except Exception:
                        raise
                        
                else:
                    print(f'Non-retriable transaction error: {error}')
                    producer.abort_transaction()
                    raise
                    
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            producer.abort_transaction()
        except Exception:
            pass  # Already interrupted
            
    finally:
        producer.flush()

Admin Client Error Handling

from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaException

def handle_admin_errors():
    admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
    
    # Create topics with error handling
    new_topics = [NewTopic('test-topic-1', 3, 1), NewTopic('test-topic-2', 6, 1)]
    
    fs = admin_client.create_topics(new_topics, request_timeout=30)
    
    for topic, f in fs.items():
        try:
            f.result()  # Block until operation completes
            print(f'Topic {topic} created successfully')
            
        except KafkaException as e:
            error = e.args[0]
            
            if error.code() == KafkaError._TOPIC_ALREADY_EXISTS:
                print(f'Topic {topic} already exists')
                # Could continue or handle as needed
                
            elif error.code() == KafkaError._TOPIC_AUTHORIZATION_FAILED:
                print(f'Authorization failed for topic {topic}')
                # Handle authorization error
                
            elif error.code() == KafkaError._REQUEST_TIMED_OUT:
                print(f'Request timed out for topic {topic}')
                # Could retry with longer timeout
                
            else:
                print(f'Failed to create topic {topic}: {error}')
                
        except Exception as e:
            print(f'Unexpected error creating topic {topic}: {e}')

handle_admin_errors()

Comprehensive Error Logging

import logging
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.error import ConsumeError

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def comprehensive_error_handling():
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'logging-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False
    })
    
    consumer.subscribe(['my-topic'])
    
    try:
        while True:
            try:
                msg = consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                    
                if msg.error():
                    error = msg.error()
                    
                    # Log error details
                    logger.error(
                        f'Consumer error - Code: {error.code()}, '
                        f'Name: {error.name()}, '
                        f'Description: {error.str()}, '
                        f'Fatal: {error.fatal()}, '
                        f'Retriable: {error.retriable()}'
                    )
                    
                    # Handle based on error characteristics
                    if error.fatal():
                        logger.critical('Fatal error - exiting')
                        break
                    elif not error.retriable():
                        logger.warning('Non-retriable error - skipping')
                        continue
                    else:
                        logger.info('Retriable error - continuing')
                        continue
                        
                # Process message
                logger.info(f'Processing message: {msg.topic()}[{msg.partition()}]@{msg.offset()}')
                
                # Simulate processing
                try:
                    # Process message here
                    pass
                    
                except Exception as processing_error:
                    logger.error(f'Message processing failed: {processing_error}')
                    # Could skip message or handle error
                    continue
                    
                # Manual commit after successful processing
                try:
                    consumer.commit(message=msg)
                except KafkaException as commit_error:
                    logger.error(f'Commit failed: {commit_error}')
                    # Could retry commit or continue
                    
            except Exception as unexpected_error:
                logger.error(f'Unexpected error in consumer loop: {unexpected_error}')
                # Could break, continue, or re-raise depending on requirements
                
    except KeyboardInterrupt:
        logger.info('Consumer interrupted by user')
    except Exception as e:
        logger.critical(f'Critical consumer error: {e}')
    finally:
        logger.info('Closing consumer')
        consumer.close()

comprehensive_error_handling()

Install with Tessl CLI

npx tessl i tessl/pypi-confluent-kafka

docs

admin-client.md

core-producer-consumer.md

error-handling.md

index.md

schema-registry.md

serialization.md

tile.json