CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

errors.mddocs/

Error Handling

Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, authentication failures, and consumer group coordination problems.

Capabilities

Base Exception Classes

Foundation exception classes that define the error handling framework.

class KafkaError(RuntimeError):
    """
    Base exception for all Kafka-related errors.
    
    Attributes:
        retriable (bool): Whether the operation that caused this error can be retried
        invalid_metadata (bool): Whether this error indicates stale metadata that should be refreshed
    """
    retriable: bool = False
    invalid_metadata: bool = False
    
    def __str__(self):
        """String representation of the error."""

Network and Connection Errors

Errors related to network connectivity and broker communication.

class NoBrokersAvailable(KafkaError):
    """No Kafka brokers are available for connection."""
    retriable: bool = True
    invalid_metadata: bool = True

class NodeNotReadyError(KafkaError):
    """Broker node is not ready to accept requests."""
    retriable: bool = True

class ConnectionError(KafkaError):
    """Failed to establish connection to Kafka broker."""
    retriable: bool = True

class RequestTimedOutError(KafkaError):
    """Request to Kafka broker timed out."""
    retriable: bool = True

class NetworkException(KafkaError):
    """Network-level communication error."""
    retriable: bool = True

Protocol and Message Errors

Errors related to Kafka protocol handling and message processing.

class KafkaProtocolError(KafkaError):
    """Kafka protocol-level error."""
    retriable: bool = True

class CorrelationIdError(KafkaProtocolError):
    """Response correlation ID does not match request."""
    retriable: bool = True

class BufferUnderflowError(KafkaError):
    """Insufficient data in buffer for deserialization."""

class ChecksumError(KafkaError):
    """Message checksum validation failed."""

class CompressionNotSupportedError(KafkaError):
    """Requested compression type is not supported."""

class UnsupportedVersionError(KafkaError):
    """API version is not supported by broker."""

Client State and Usage Errors

Errors related to incorrect client usage or invalid state.

class IllegalStateError(KafkaError):
    """Client is in an invalid state for the requested operation."""

class IllegalArgumentError(KafkaError):
    """Invalid argument provided to client method."""

class InvalidTopicError(KafkaError):
    """Topic name is invalid or does not exist."""

class TopicAlreadyExistsError(KafkaError):
    """Attempted to create a topic that already exists."""

class InvalidPartitionsError(KafkaError):
    """Invalid partition configuration."""

class InvalidReplicationFactorError(KafkaError):
    """Invalid replication factor specified."""

Producer Errors

Errors specific to producer operations.

class TooManyInFlightRequests(KafkaError):
    """Too many unacknowledged requests are pending."""
    retriable: bool = True

class MessageSizeTooLargeError(KafkaError):
    """Message size exceeds broker limits."""

class RecordBatchTooLargeError(KafkaError):
    """Record batch size exceeds broker limits."""

class InvalidRecordError(KafkaError):
    """Record contains invalid data."""

class RecordTooLargeError(KafkaError):
    """Individual record exceeds size limits."""

class UnknownTopicOrPartitionError(KafkaError):  
    """Topic or partition does not exist."""
    invalid_metadata: bool = True

class LeaderNotAvailableError(KafkaError):
    """Partition leader is not available."""
    retriable: bool = True
    invalid_metadata: bool = True

class NotLeaderForPartitionError(KafkaError):
    """Broker is not the leader for the partition."""
    retriable: bool = True
    invalid_metadata: bool = True

Consumer Errors

Errors specific to consumer operations and group coordination.

class CommitFailedError(KafkaError):
    """Offset commit failed due to group rebalance or other issues."""

class InvalidSessionTimeoutError(KafkaError):
    """Session timeout is outside broker's allowed range."""

class InvalidGroupIdError(KafkaError):
    """Consumer group ID is invalid."""

class GroupLoadInProgressError(KafkaError):
    """Consumer group is still loading."""
    retriable: bool = True

class GroupCoordinatorNotAvailableError(KafkaError):
    """Group coordinator is not available."""
    retriable: bool = True

class NotCoordinatorForGroupError(KafkaError):
    """Broker is not the coordinator for this group."""
    retriable: bool = True

class UnknownMemberIdError(KafkaError):
    """Consumer group member ID is not recognized."""

class IllegalGenerationError(KafkaError):
    """Consumer group generation ID is invalid."""

class OffsetOutOfRangeError(KafkaError):
    """Requested offset is outside the available range."""

class GroupAuthorizationFailedError(KafkaError):
    """Not authorized to access consumer group."""

class TopicAuthorizationFailedError(KafkaError):
    """Not authorized to access topic."""

Authentication and Authorization Errors

Errors related to security and access control.

class AuthenticationFailedError(KafkaError):
    """SASL authentication failed."""

class AuthenticationMethodNotSupported(KafkaError):
    """Requested SASL mechanism is not supported."""

class SaslAuthenticationError(KafkaError):
    """SASL authentication error."""

class ClusterAuthorizationFailedError(KafkaError):
    """Not authorized to perform cluster operations."""

class DelegationTokenNotFoundError(KafkaError):
    """Delegation token not found."""

class DelegationTokenAuthorizationFailedError(KafkaError):
    """Not authorized to use delegation token."""

Metadata and Coordination Errors

Errors related to metadata management and cluster coordination.

class StaleMetadata(KafkaError):
    """Client metadata is stale and needs refresh."""
    retriable: bool = True
    invalid_metadata: bool = True

class MetadataEmptyBrokerList(KafkaError):
    """Broker list in metadata is empty."""
    retriable: bool = True

class UnrecognizedBrokerVersion(KafkaError):
    """Broker version is not recognized."""

class IncompatibleBrokerVersion(KafkaError):
    """Broker version is incompatible with client."""

class Cancelled(KafkaError):
    """Operation was cancelled."""
    retriable: bool = True

Protocol-Specific Errors

Errors returned by specific Kafka protocol APIs.

class BrokerResponseError(KafkaError):
    """
    Base class for errors returned by Kafka brokers.
    
    Attributes:
        errno (int): Kafka error code
        message (str): Error message
        description (str): Error description
    """
    def __init__(self, errno, message=None, description=None):
        self.errno = errno
        self.message = message
        self.description = description
        
    errno: int
    message: str
    description: str

# Specific broker errors (partial list)
class UnknownError(BrokerResponseError):
    """Unknown server error."""
    errno = -1

class OffsetMetadataTooLarge(BrokerResponseError):
    """Offset metadata string is too large."""
    errno = 12

class InvalidTopicException(BrokerResponseError):
    """Topic name is invalid."""
    errno = 17

class RecordListTooLarge(BrokerResponseError):
    """Record list is too large."""
    errno = 18

class NotEnoughReplicas(BrokerResponseError):
    """Not enough replicas available."""
    errno = 19

class NotEnoughReplicasAfterAppend(BrokerResponseError):
    """Not enough replicas after append."""
    errno = 20

Usage Examples

Basic Error Handling

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError, NoBrokersAvailable, RequestTimedOutError

try:
    producer = KafkaProducer(bootstrap_servers=['invalid-broker:9092'])
    producer.send('my-topic', b'test message').get(timeout=10)
    
except NoBrokersAvailable:
    print("No Kafka brokers are available")
    
except RequestTimedOutError:
    print("Request timed out")
    
except KafkaError as e:
    print(f"Kafka error: {e}")
    
finally:
    if 'producer' in locals():
        producer.close()

Producer Error Handling with Retries

from kafka import KafkaProducer
from kafka.errors import (KafkaError, MessageSizeTooLargeError, 
                         NotLeaderForPartitionError, RequestTimedOutError)
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    retries=5,  # Built-in retries for retriable errors
    retry_backoff_ms=1000
)

def send_with_retry(topic, message, max_retries=3):
    for attempt in range(max_retries):
        try:
            future = producer.send(topic, message)
            metadata = future.get(timeout=10)
            print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
            return metadata
            
        except NotLeaderForPartitionError:
            print(f"Leader not available, attempt {attempt + 1}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            continue
            
        except MessageSizeTooLargeError:
            print("Message is too large, cannot retry")
            raise
            
        except RequestTimedOutError:
            print(f"Request timed out, attempt {attempt + 1}")
            if attempt < max_retries - 1:
                time.sleep(1)
            continue
            
    raise KafkaError("Failed to send after all retries")

# Use the retry function
try:
    send_with_retry('my-topic', b'important message')
except KafkaError as e:
    print(f"Final error: {e}")

Consumer Error Handling

from kafka import KafkaConsumer
from kafka.errors import (CommitFailedError, OffsetOutOfRangeError, 
                         GroupCoordinatorNotAvailableError, KafkaError)

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    enable_auto_commit=False
)

try:
    for message in consumer:
        try:
            # Process message
            process_message(message)
            
            # Manual commit
            consumer.commit()
            
        except CommitFailedError:
            print("Commit failed, likely due to rebalance")
            # Consumer will rejoin group and get new assignment
            
        except OffsetOutOfRangeError as e:
            print(f"Offset out of range: {e}")
            # Seek to beginning or end
            consumer.seek_to_beginning()
            
        except Exception as e:
            print(f"Processing error: {e}")
            # Continue with next message
            
except GroupCoordinatorNotAvailableError:
    print("Group coordinator not available")
    
except KafkaError as e:
    print(f"Kafka error: {e}")
    
finally:
    consumer.close()

def process_message(message):
    # Simulate message processing
    print(f"Processing: {message.value}")

Authentication Error Handling

from kafka import KafkaProducer
from kafka.errors import (AuthenticationFailedError, 
                         AuthenticationMethodNotSupported,
                         ClusterAuthorizationFailedError)

try:
    producer = KafkaProducer(
        bootstrap_servers=['secure-broker:9093'],
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-256',
        sasl_plain_username='wrong-user',
        sasl_plain_password='wrong-password'
    )
    
    producer.send('secure-topic', b'message')
    
except AuthenticationFailedError:
    print("SASL authentication failed - check credentials")
    
except AuthenticationMethodNotSupported:
    print("SASL mechanism not supported by broker")
    
except ClusterAuthorizationFailedError:
    print("Not authorized to access cluster")
    
except Exception as e:
    print(f"Other error: {e}")

Admin Client Error Handling

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import (TopicAlreadyExistsError, InvalidReplicationFactorError,
                         ClusterAuthorizationFailedError, KafkaError)

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    topics = [NewTopic('test-topic', 3, 2)]
    result = admin.create_topics(topics)
    
    # Check individual topic results
    for topic_name, error in result.items():
        if error is None:
            print(f"Topic {topic_name} created successfully")
        else:
            print(f"Failed to create topic {topic_name}: {error}")
            
except TopicAlreadyExistsError:
    print("Topic already exists")
    
except InvalidReplicationFactorError:
    print("Invalid replication factor")
    
except ClusterAuthorizationFailedError:
    print("Not authorized to create topics")
    
except KafkaError as e:
    print(f"Admin operation failed: {e}")
    
finally:
    admin.close()

Error Classification and Recovery

from kafka.errors import KafkaError

def handle_kafka_error(error):
    """Handle Kafka errors with appropriate recovery strategies."""
    
    if hasattr(error, 'retriable') and error.retriable:
        print(f"Retriable error: {error}")
        return 'retry'
        
    if hasattr(error, 'invalid_metadata') and error.invalid_metadata:
        print(f"Metadata refresh needed: {error}")
        return 'refresh_metadata'
        
    # Non-retriable errors
    print(f"Non-retriable error: {error}")
    return 'fail'

# Example usage
try:
    # Kafka operation
    pass
except KafkaError as e:
    strategy = handle_kafka_error(e)
    
    if strategy == 'retry':
        # Implement retry logic
        pass
    elif strategy == 'refresh_metadata':
        # Force metadata refresh
        pass
    else:
        # Log error and exit
        raise

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json