CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Pending
Overview
Eval results
Files

errors.mddocs/

Error Handling

Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.

Capabilities

Base Exception Classes

Foundation exception classes providing error categorization and retry logic.

class KafkaError(Exception):
    """
    Base exception for all Kafka-related errors.
    
    Attributes:
    - retriable: bool, whether error is retriable
    - invalid_metadata: bool, whether error invalidates metadata
    """
    retriable = False
    invalid_metadata = False
    
    def __init__(self, *args):
        super(KafkaError, self).__init__(*args)

class BrokerResponseError(KafkaError):
    """
    Base class for errors returned by Kafka brokers.
    
    Attributes:
    - errno: int, Kafka error code
    - message: str, error message
    - description: str, detailed error description
    """
    errno = None
    message = None
    description = None

class AuthorizationError(KafkaError):
    """Base class for authorization-related errors."""
    pass

Client-Side Errors

Errors originating from the client library itself, typically related to configuration, connection, or protocol issues.

class KafkaConfigurationError(KafkaError):
    """Configuration parameter errors."""
    pass

class KafkaConnectionError(KafkaError):
    """
    Connection-related errors.
    
    Attributes:
    - retriable = True
    """
    retriable = True

class KafkaProtocolError(KafkaError):
    """
    Protocol-related errors.
    
    Attributes:
    - retriable = True
    """
    retriable = True

class KafkaTimeoutError(KafkaError):
    """
    Request timeout errors.
    
    Attributes:
    - retriable = True
    """
    retriable = True

class IllegalArgumentError(KafkaError):
    """Invalid argument errors."""
    pass

class IllegalStateError(KafkaError):
    """Invalid state errors."""
    pass

class IncompatibleBrokerVersion(KafkaError):
    """Broker version compatibility errors."""
    pass

class MetadataEmptyBrokerList(KafkaError):
    """
    No brokers available for metadata.
    
    Attributes:
    - retriable = True
    - invalid_metadata = True
    """
    retriable = True
    invalid_metadata = True

class NoBrokersAvailable(KafkaError):
    """
    No brokers reachable.
    
    Attributes:
    - retriable = True
    """
    retriable = True

class NoOffsetForPartitionError(KafkaError):
    """No offset available for partition."""
    pass

class NodeNotReadyError(KafkaError):
    """
    Node not ready for requests.
    
    Attributes:
    - retriable = True
    """
    retriable = True

class QuotaViolationError(KafkaError):
    """Rate limit exceeded."""
    pass

class StaleMetadata(KafkaError):
    """
    Metadata needs refresh.
    
    Attributes:
    - retriable = True
    - invalid_metadata = True
    """
    retriable = True
    invalid_metadata = True

class TooManyInFlightRequests(KafkaError):
    """
    Request queue full.
    
    Attributes:
    - retriable = True
    """
    retriable = True

class UnrecognizedBrokerVersion(KafkaError):
    """Unknown broker version."""
    pass

class UnsupportedCodecError(KafkaError):
    """Unsupported compression codec."""
    pass

Authorization Errors

Errors related to access control and authentication failures.

class TopicAuthorizationFailedError(AuthorizationError):
    """
    Topic access denied.
    
    Attributes:
    - errno = 29
    - message = 'TOPIC_AUTHORIZATION_FAILED'
    - description = 'Not authorized to access topics'
    """
    errno = 29
    message = 'TOPIC_AUTHORIZATION_FAILED'
    description = 'Not authorized to access topics'

class GroupAuthorizationFailedError(AuthorizationError):
    """
    Consumer group access denied.
    
    Attributes:
    - errno = 30
    - message = 'GROUP_AUTHORIZATION_FAILED'  
    - description = 'Not authorized to access group'
    """
    errno = 30
    message = 'GROUP_AUTHORIZATION_FAILED'
    description = 'Not authorized to access group'

class ClusterAuthorizationFailedError(AuthorizationError):
    """
    Cluster access denied.
    
    Attributes:
    - errno = 31
    - message = 'CLUSTER_AUTHORIZATION_FAILED'
    - description = 'Cluster authorization failed'
    """
    errno = 31
    message = 'CLUSTER_AUTHORIZATION_FAILED'
    description = 'Cluster authorization failed'

class TransactionalIdAuthorizationFailedError(AuthorizationError):
    """
    Transactional ID access denied.
    
    Attributes:
    - errno = 53
    - message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
    - description = 'The transactional id authorization failed'
    """
    errno = 53
    message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
    description = 'The transactional id authorization failed'

class DelegationTokenAuthorizationFailedError(AuthorizationError):
    """
    Delegation token access denied.
    
    Attributes:
    - errno = 58
    - message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
    - description = 'Delegation Token authorization failed'
    """
    errno = 58
    message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
    description = 'Delegation Token authorization failed'

Common Broker Response Errors

Most frequently encountered broker-side errors with their error codes and retry semantics.

class OffsetOutOfRangeError(BrokerResponseError):
    """
    Requested offset is out of range.
    
    Attributes:
    - errno = 1
    - message = 'OFFSET_OUT_OF_RANGE'
    - description = 'The requested offset is not within the range of offsets'
    """
    errno = 1
    message = 'OFFSET_OUT_OF_RANGE'
    description = 'The requested offset is not within the range of offsets'

class UnknownTopicOrPartitionError(BrokerResponseError):
    """
    Topic or partition does not exist.
    
    Attributes:
    - errno = 3
    - message = 'UNKNOWN_TOPIC_OR_PARTITION'
    - description = 'This server does not host this topic-partition'
    - retriable = True
    - invalid_metadata = True
    """
    errno = 3
    message = 'UNKNOWN_TOPIC_OR_PARTITION'
    description = 'This server does not host this topic-partition'
    retriable = True
    invalid_metadata = True

class LeaderNotAvailableError(BrokerResponseError):
    """
    Partition leader not available.
    
    Attributes:
    - errno = 5
    - message = 'LEADER_NOT_AVAILABLE'
    - description = 'There is no leader for this topic-partition'
    - retriable = True
    - invalid_metadata = True
    """
    errno = 5
    message = 'LEADER_NOT_AVAILABLE'
    description = 'There is no leader for this topic-partition'
    retriable = True
    invalid_metadata = True

class NotLeaderForPartitionError(BrokerResponseError):
    """
    Broker is not the leader for partition.
    
    Attributes:
    - errno = 6
    - message = 'NOT_LEADER_FOR_PARTITION'
    - description = 'This server is not the leader for that topic-partition'
    - retriable = True
    - invalid_metadata = True
    """
    errno = 6
    message = 'NOT_LEADER_FOR_PARTITION'
    description = 'This server is not the leader for that topic-partition'
    retriable = True
    invalid_metadata = True

class RequestTimedOutError(BrokerResponseError):
    """
    Request timed out.
    
    Attributes:
    - errno = 7
    - message = 'REQUEST_TIMED_OUT'
    - description = 'The request timed out'
    - retriable = True
    """
    errno = 7
    message = 'REQUEST_TIMED_OUT'
    description = 'The request timed out'
    retriable = True

class BrokerNotAvailableError(BrokerResponseError):
    """
    Broker not available.
    
    Attributes:
    - errno = 8
    - message = 'BROKER_NOT_AVAILABLE'
    - description = 'The broker is not available'
    - retriable = True
    - invalid_metadata = True
    """
    errno = 8
    message = 'BROKER_NOT_AVAILABLE'
    description = 'The broker is not available'
    retriable = True
    invalid_metadata = True

class ReplicaNotAvailableError(BrokerResponseError):
    """
    Replica not available.
    
    Attributes:
    - errno = 9
    - message = 'REPLICA_NOT_AVAILABLE'
    - description = 'The replica is not available for the requested topic-partition'
    - retriable = True
    """
    errno = 9
    message = 'REPLICA_NOT_AVAILABLE'
    description = 'The replica is not available for the requested topic-partition'
    retriable = True

class MessageSizeTooLargeError(BrokerResponseError):
    """
    Message size exceeds limits.
    
    Attributes:
    - errno = 10
    - message = 'MESSAGE_TOO_LARGE'
    - description = 'The request included a message larger than the max message size'
    """
    errno = 10
    message = 'MESSAGE_TOO_LARGE'
    description = 'The request included a message larger than the max message size'

class TopicAlreadyExistsError(BrokerResponseError):
    """
    Topic already exists.
    
    Attributes:
    - errno = 36
    - message = 'TOPIC_ALREADY_EXISTS'
    - description = 'Topic already exists'
    """
    errno = 36
    message = 'TOPIC_ALREADY_EXISTS'
    description = 'Topic already exists'

class InvalidTopicError(BrokerResponseError):
    """
    Invalid topic name.
    
    Attributes:
    - errno = 17
    - message = 'INVALID_TOPIC_EXCEPTION'
    - description = 'The request attempted to perform an operation on an invalid topic'
    """
    errno = 17
    message = 'INVALID_TOPIC_EXCEPTION'
    description = 'The request attempted to perform an operation on an invalid topic'

Consumer-Specific Errors

Errors specific to consumer operations and group coordination.

class OffsetMetadataTooLargeError(BrokerResponseError):
    """
    Offset metadata too large.
    
    Attributes:
    - errno = 12
    - message = 'OFFSET_METADATA_TOO_LARGE'
    - description = 'The metadata field of the offset request was too large'
    """
    errno = 12
    message = 'OFFSET_METADATA_TOO_LARGE'
    description = 'The metadata field of the offset request was too large'

class GroupLoadInProgressError(BrokerResponseError):
    """
    Consumer group loading in progress.
    
    Attributes:
    - errno = 14
    - message = 'GROUP_LOAD_IN_PROGRESS'
    - description = 'The coordinator is loading and hence can\'t process requests'
    - retriable = True
    """
    errno = 14
    message = 'GROUP_LOAD_IN_PROGRESS'
    description = 'The coordinator is loading and hence can\'t process requests'
    retriable = True

class GroupCoordinatorNotAvailableError(BrokerResponseError):
    """
    Group coordinator not available.
    
    Attributes:
    - errno = 15
    - message = 'GROUP_COORDINATOR_NOT_AVAILABLE'
    - description = 'The group coordinator is not available'
    - retriable = True
    - invalid_metadata = True
    """
    errno = 15
    message = 'GROUP_COORDINATOR_NOT_AVAILABLE'
    description = 'The group coordinator is not available'
    retriable = True
    invalid_metadata = True

class NotCoordinatorForGroupError(BrokerResponseError):
    """
    Broker is not coordinator for group.
    
    Attributes:
    - errno = 16
    - message = 'NOT_COORDINATOR_FOR_GROUP'
    - description = 'The broker is not the coordinator for this group'
    - retriable = True
    - invalid_metadata = True
    """
    errno = 16
    message = 'NOT_COORDINATOR_FOR_GROUP'
    description = 'The broker is not the coordinator for this group'
    retriable = True
    invalid_metadata = True

class UnknownMemberIdError(BrokerResponseError):
    """
    Unknown member ID.
    
    Attributes:
    - errno = 25
    - message = 'UNKNOWN_MEMBER_ID'
    - description = 'The member id is not in the current generation'
    """
    errno = 25
    message = 'UNKNOWN_MEMBER_ID'
    description = 'The member id is not in the current generation'

class IllegalGenerationError(BrokerResponseError):
    """
    Illegal generation ID.
    
    Attributes:
    - errno = 22
    - message = 'ILLEGAL_GENERATION'
    - description = 'Specified group generation id is not valid'
    """
    errno = 22
    message = 'ILLEGAL_GENERATION'
    description = 'Specified group generation id is not valid'

class RebalanceInProgressError(BrokerResponseError):
    """
    Consumer group rebalance in progress.
    
    Attributes:
    - errno = 27
    - message = 'REBALANCE_IN_PROGRESS'
    - description = 'The group is rebalancing, so a rejoin is needed'
    - retriable = True
    """
    errno = 27
    message = 'REBALANCE_IN_PROGRESS'
    description = 'The group is rebalancing, so a rejoin is needed'
    retriable = True

Producer-Specific Errors

Errors specific to producer operations and message publishing.

class InvalidRequiredAcksError(BrokerResponseError):
    """
    Invalid required acknowledgments.
    
    Attributes:
    - errno = 21
    - message = 'INVALID_REQUIRED_ACKS'
    - description = 'Specified required acks is invalid (must be -1, 0, or 1)'
    """
    errno = 21
    message = 'INVALID_REQUIRED_ACKS'
    description = 'Specified required acks is invalid (must be -1, 0, or 1)'

class RecordListTooLargeError(BrokerResponseError):
    """
    Record batch too large.
    
    Attributes:
    - errno = 18
    - message = 'RECORD_LIST_TOO_LARGE' 
    - description = 'The request included message batch larger than the configured segment size'
    """
    errno = 18
    message = 'RECORD_LIST_TOO_LARGE'
    description = 'The request included message batch larger than the configured segment size'

class InvalidPartitionError(BrokerResponseError):
    """
    Invalid partition number.
    
    Attributes:
    - errno = 4
    - message = 'INVALID_FETCH_SIZE'
    - description = 'The message has an invalid offset'
    """
    errno = 4
    message = 'INVALID_FETCH_SIZE'
    description = 'The message has an invalid offset'

class DuplicateSequenceNumberError(BrokerResponseError):
    """
    Duplicate sequence number (idempotent producer).
    
    Attributes:  
    - errno = 45
    - message = 'DUPLICATE_SEQUENCE_NUMBER'
    - description = 'A producer attempted to produce with an old sequence number'
    """
    errno = 45
    message = 'DUPLICATE_SEQUENCE_NUMBER'
    description = 'A producer attempted to produce with an old sequence number'

class OutOfOrderSequenceNumberError(BrokerResponseError):
    """
    Out of order sequence number (idempotent producer).
    
    Attributes:
    - errno = 46
    - message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
    - description = 'A producer attempted to produce with a sequence number which is not the expected next one'
    """
    errno = 46
    message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
    description = 'A producer attempted to produce with a sequence number which is not the expected next one'

Error Handling Patterns

Basic Error Handling

from kafka import KafkaProducer, KafkaConsumer 
from kafka.errors import (KafkaError, KafkaTimeoutError, KafkaConnectionError,
                         TopicAuthorizationFailedError, MessageSizeTooLargeError)

# Producer error handling
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

try:
    future = producer.send('my-topic', value=b'test message')
    record_metadata = future.get(timeout=30)
    print(f"Message sent successfully: offset {record_metadata.offset}")
    
except TopicAuthorizationFailedError:
    print("Access denied to topic")
except MessageSizeTooLargeError:
    print("Message too large for broker")
except KafkaTimeoutError:
    print("Request timed out")
except KafkaConnectionError:
    print("Connection failed")
except KafkaError as e:
    print(f"Kafka error: {e}")
finally:
    producer.close()

Retry Logic with Exponential Backoff

import time
import random
from kafka import KafkaProducer
from kafka.errors import KafkaError

def send_with_retry(producer, topic, value, max_retries=3):
    """Send message with exponential backoff retry logic."""
    
    for attempt in range(max_retries + 1):
        try:
            future = producer.send(topic, value=value)
            return future.get(timeout=30)
            
        except KafkaError as e:
            if not e.retriable or attempt == max_retries:
                # Non-retriable error or max retries reached
                raise e
            
            # Calculate exponential backoff with jitter
            backoff = (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {backoff:.2f}s")
            time.sleep(backoff)
    
    raise KafkaError("Max retries exceeded")

# Usage
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

try:
    metadata = send_with_retry(producer, 'events', b'important message')
    print(f"Message sent successfully: {metadata}")
except KafkaError as e:
    print(f"Failed to send message: {e}")
finally:
    producer.close()

Consumer Error Handling

from kafka import KafkaConsumer
from kafka.errors import (ConsumerTimeoutError, OffsetOutOfRangeError,
                         GroupAuthorizationFailedError, StaleMetadata)

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='error-handling-group',
    auto_offset_reset='earliest',
    consumer_timeout_ms=10000
)

try:
    for message in consumer:
        try:
            # Process message
            process_message(message)
            
        except Exception as e:
            print(f"Error processing message at offset {message.offset}: {e}")
            # Could implement dead letter queue here
            continue
            
except OffsetOutOfRangeError:
    print("Offset out of range - seeking to beginning")
    consumer.seek_to_beginning()
    
except GroupAuthorizationFailedError:
    print("Access denied to consumer group")
    
except StaleMetadata:
    print("Metadata stale - will refresh automatically")
    
except ConsumerTimeoutError:
    print("Consumer timeout - no messages received")
    
except KeyboardInterrupt:
    print("Shutting down consumer")
    
finally:
    consumer.close()

Circuit Breaker Pattern

import time
from enum import Enum
from kafka import KafkaProducer
from kafka.errors import KafkaError

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open" 
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection."""
        
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                print("Circuit breaker transitioning to HALF_OPEN")
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
            
        except Exception as e:
            self._on_failure()
            raise e
    
    def _on_success(self):
        """Reset circuit breaker on successful call."""
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            print("Circuit breaker reset to CLOSED")
    
    def _on_failure(self):
        """Handle failure and potentially open circuit."""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker OPENED after {self.failure_count} failures")

# Usage with Kafka producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def send_message(topic, value):
    """Send message through circuit breaker."""
    future = producer.send(topic, value=value)
    return future.get(timeout=10)

# Send messages with circuit breaker protection
for i in range(20):
    try:
        result = circuit_breaker.call(send_message, 'protected-topic', f'Message {i}'.encode())
        print(f"Message {i} sent successfully")
        
    except Exception as e:
        print(f"Message {i} failed: {e}")
        time.sleep(1)  # Brief pause before next attempt

producer.close()

Error Classification and Handling

from kafka.errors import (KafkaError, BrokerResponseError, AuthorizationError,
                         KafkaTimeoutError, KafkaConnectionError)

def classify_and_handle_error(error):
    """Classify error and determine appropriate handling strategy."""
    
    if isinstance(error, AuthorizationError):
        return {
            'category': 'authorization',
            'action': 'check_credentials',
            'retriable': False,
            'severity': 'high'
        }
    
    elif isinstance(error, KafkaConnectionError):
        return {
            'category': 'network',
            'action': 'retry_with_backoff',
            'retriable': True,
            'severity': 'medium'
        }
    
    elif isinstance(error, KafkaTimeoutError):
        return {
            'category': 'timeout',
            'action': 'retry_with_longer_timeout',
            'retriable': True,
            'severity': 'low'
        }
    
    elif isinstance(error, BrokerResponseError):
        if error.retriable:
            return {
                'category': 'broker_retriable',
                'action': 'retry_after_delay',
                'retriable': True,
                'severity': 'medium'
            }
        else:
            return {
                'category': 'broker_fatal',
                'action': 'fail_fast',
                'retriable': False,
                'severity': 'high'
            }
    
    elif isinstance(error, KafkaError):
        return {
            'category': 'generic_kafka',
            'action': 'investigate',
            'retriable': getattr(error, 'retriable', False),
            'severity': 'medium'
        }
    
    else:
        return {
            'category': 'unknown',
            'action': 'investigate',
            'retriable': False,
            'severity': 'high'
        }

# Usage in error handler
def handle_kafka_error(error, operation_context):
    """Handle Kafka error based on classification."""
    
    classification = classify_and_handle_error(error)
    
    print(f"Error in {operation_context}: {error}")
    print(f"Classification: {classification}")
    
    if classification['severity'] == 'high':
        # Log critical error
        logger.critical(f"Critical Kafka error: {error}")
        
    if classification['retriable']:
        return True  # Indicate retry should be attempted
    else:
        return False  # Indicate failure should be reported

Monitoring and Alerting

import logging
from collections import defaultdict, deque
import time
from kafka.errors import KafkaError

class KafkaErrorMonitor:
    """Monitor and track Kafka errors for alerting."""
    
    def __init__(self, window_size=300):  # 5 minute window
        self.window_size = window_size
        self.error_counts = defaultdict(lambda: deque())
        self.logger = logging.getLogger(__name__)
    
    def record_error(self, error, context="unknown"):
        """Record error occurrence."""
        
        error_type = type(error).__name__
        timestamp = time.time()
        
        # Add to sliding window
        self.error_counts[error_type].append(timestamp)
        
        # Remove old entries outside window
        cutoff = timestamp - self.window_size
        while (self.error_counts[error_type] and 
               self.error_counts[error_type][0] < cutoff):
            self.error_counts[error_type].popleft()
        
        # Log error
        self.logger.error(f"Kafka error in {context}: {error}")
        
        # Check for alert conditions
        self._check_alert_conditions(error_type)
    
    def _check_alert_conditions(self, error_type):
        """Check if error rates exceed alert thresholds."""
        
        error_count = len(self.error_counts[error_type])
        
        # Alert if too many errors in window
        if error_count > 10:
            self.logger.critical(
                f"High error rate: {error_count} {error_type} errors "
                f"in last {self.window_size} seconds"
            )
        
        # Alert for specific critical errors  
        if error_type in ['TopicAuthorizationFailedError', 'ClusterAuthorizationFailedError']:
            self.logger.critical(f"Authorization failure: {error_type}")
    
    def get_error_summary(self):
        """Get summary of recent errors."""
        
        summary = {}
        current_time = time.time()
        cutoff = current_time - self.window_size
        
        for error_type, timestamps in self.error_counts.items():
            # Count recent errors
            recent_count = sum(1 for ts in timestamps if ts > cutoff)
            if recent_count > 0:
                summary[error_type] = recent_count
        
        return summary

# Usage
error_monitor = KafkaErrorMonitor()

def monitored_kafka_operation(operation_func, *args, **kwargs):
    """Execute Kafka operation with error monitoring."""
    
    try:
        return operation_func(*args, **kwargs)
        
    except KafkaError as e:
        error_monitor.record_error(e, context=operation_func.__name__)
        raise e

# Example usage
try:
    monitored_kafka_operation(producer.send, 'topic', b'message')
except KafkaError:
    # Error already logged and monitored
    pass

# Periodic error summary
print("Recent errors:", error_monitor.get_error_summary())

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python

docs

admin.md

consumer.md

errors.md

index.md

producer.md

structures.md

tile.json