or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

errors.mddocs/common/

Error Handling and Exceptions

Kafka provides a comprehensive exception hierarchy for handling errors across all APIs. All Kafka exceptions extend from KafkaException, with specialized sub-hierarchies for retriable and non-retriable errors.

Exception Hierarchy

package org.apache.kafka.common;

public class KafkaException extends RuntimeException {
    // Base class for all Kafka exceptions
}

Core Exception Types

package org.apache.kafka.common.errors;

// Base exception for API errors
public class ApiException extends KafkaException {
    // Base class for all API-related exceptions
}

// Retriable exceptions - operations can be retried
public abstract class RetriableException extends ApiException {
    // Operations that failed but can be safely retried
}

// Application-recoverable exceptions - fatal but application can recover
public abstract class ApplicationRecoverableException extends ApiException {
    // Fatal errors that require application-level recovery
}

Specialized Exception Base Classes

// Producer exceptions
package org.apache.kafka.clients.producer;
public class BufferExhaustedException extends TimeoutException {
    // Producer buffer exhausted
}

// Consumer exceptions
package org.apache.kafka.clients.consumer;
public class CommitFailedException extends ApiException {
    // Commit failed, typically due to rebalance
}

// Streams exceptions
package org.apache.kafka.streams.errors;
public class StreamsException extends KafkaException {
    // Base class for all Streams exceptions
}

// Connect exceptions
package org.apache.kafka.connect.errors;
public class ConnectException extends KafkaException {
    // Base class for all Connect exceptions
}

// Serialization exceptions
package org.apache.kafka.common.errors;
public class SerializationException extends KafkaException {
    // Serialization/deserialization errors
}

Retriable Exceptions

These exceptions indicate temporary failures. Operations can be safely retried.

Network and Broker Availability

package org.apache.kafka.common.errors;

// Network-related retriable errors
public class NetworkException extends RetriableException {
    // Network communication error
}

public class DisconnectException extends RetriableException {
    // Connection to broker was disconnected
}

public class BrokerNotAvailableException extends RetriableException {
    // Broker is not currently available
}

public class LeaderNotAvailableException extends RetriableException {
    // Leader for partition is not available
}

public class NotLeaderOrFollowerException extends RetriableException {
    // Broker is not the leader or follower for the partition
}

public class NotControllerException extends RetriableException {
    // Broker is not the controller
}

Coordinator and Group Management

package org.apache.kafka.common.errors;

public class CoordinatorNotAvailableException extends RetriableException {
    // Group coordinator is not available
}

public class NotCoordinatorException extends RetriableException {
    // Broker is not the coordinator for the group
}

public class CoordinatorLoadInProgressException extends RetriableException {
    // Coordinator is loading group metadata
}

Replication and Storage

package org.apache.kafka.common.errors;

public class NotEnoughReplicasException extends RetriableException {
    // Not enough in-sync replicas to satisfy acks requirement
}

public class NotEnoughReplicasAfterAppendException extends RetriableException {
    // Record written but not enough in-sync replicas after append
}

public class KafkaStorageException extends RetriableException {
    // Disk error on broker
}

public class OffsetNotAvailableException extends RetriableException {
    // Offset not yet available
}

public class ReplicaNotAvailableException extends RetriableException {
    // Replica not currently available
}

Timeout Errors

package org.apache.kafka.common.errors;

public class TimeoutException extends ApiException {
    // Operation timed out
}

public class RequestTimeoutException extends TimeoutException {
    // Request to broker timed out
}

// Producer-specific timeout
package org.apache.kafka.clients.producer;
public class BufferExhaustedException extends TimeoutException {
    // Producer buffer full, waiting for space timed out
}

Metadata and Topic Management

package org.apache.kafka.common.errors;

public class UnknownTopicOrPartitionException extends RetriableException {
    // Topic or partition does not exist (may be created)
}

public class UnknownTopicIdException extends RetriableException {
    // Topic ID is not recognized
}

public class InvalidMetadataException extends RetriableException {
    // Metadata is invalid and needs refresh
}

public class InconsistentTopicIdException extends RetriableException {
    // Topic ID doesn't match expected ID
}

Consumer-Specific Retriable Errors

package org.apache.kafka.clients.consumer;

public class RetriableCommitFailedException extends RetriableException {
    // Offset commit failed but can be retried
}

public class NoAvailableBrokersException extends RetriableException {
    // No brokers available to service request
}

Session Management

package org.apache.kafka.common.errors;

public class FetchSessionIdNotFoundException extends RetriableException {
    // Fetch session ID not found on broker
}

public class InvalidFetchSessionEpochException extends RetriableException {
    // Fetch session epoch is invalid
}

public class ShareSessionNotFoundException extends RetriableException {
    // Share session not found on broker (share consumer)
}

public class InvalidShareSessionEpochException extends RetriableException {
    // Share session epoch is invalid (share consumer)
}

Quota and Rate Limiting

package org.apache.kafka.common.errors;

public class ThrottlingQuotaExceededException extends RetriableException {
    // Quota exceeded, request throttled
}

Non-Retriable Exceptions

These exceptions indicate permanent failures or require corrective action.

Authentication and Authorization

package org.apache.kafka.common.errors;

public class AuthenticationException extends ApiException {
    // Authentication failed
}

public class SaslAuthenticationException extends AuthenticationException {
    // SASL authentication failed
}

public class SslAuthenticationException extends AuthenticationException {
    // SSL authentication failed
}

public class AuthorizationException extends ApiException {
    // Not authorized to perform operation
}

public class TopicAuthorizationException extends AuthorizationException {
    // Not authorized for topic operation
}

public class GroupAuthorizationException extends AuthorizationException {
    // Not authorized for group operation
}

public class ClusterAuthorizationException extends AuthorizationException {
    // Not authorized for cluster operation
}

public class TransactionalIdAuthorizationException extends AuthorizationException {
    // Not authorized for transactional ID
}

public class DelegationTokenAuthorizationException extends AuthorizationException {
    // Not authorized for delegation token operation
}

Producer Transaction Errors

package org.apache.kafka.common.errors;

public class ProducerFencedException extends ApplicationRecoverableException {
    // Producer with same transactional.id has newer epoch
}

public class OutOfOrderSequenceException extends ApiException {
    // Sequence number is out of order
}

public class DuplicateSequenceException extends ApiException {
    // Duplicate sequence number detected
}

public class InvalidProducerEpochException extends ApiException {
    // Producer epoch is invalid
}

public class InvalidTxnStateException extends ApiException {
    // Transaction state is invalid for operation
}

public class TransactionAbortedException extends ApiException {
    // Transaction was aborted
}

public class TransactionCoordinatorFencedException extends ApiException {
    // Transaction coordinator has been fenced
}

public class InvalidPidMappingException extends ApiException {
    // Producer ID mapping is invalid
}

public class UnsupportedForMessageFormatException extends ApiException {
    // Feature not supported for this message format version
}

Consumer Offset Errors

package org.apache.kafka.clients.consumer;

public class CommitFailedException extends ApiException {
    // Commit failed, typically due to rebalance
    // Consumer is no longer part of group
}

public class InvalidOffsetException extends ApiException {
    // Offset is invalid for partition
}

public class NoOffsetForPartitionException extends InvalidOffsetException {
    // No committed offset and auto.offset.reset=none
}

public class OffsetOutOfRangeException extends ApiException {
    // Offset is out of range for partition
    // Use seekToBeginning() or seekToEnd()
}

public class LogTruncationException extends ApiException {
    // Log was truncated on broker (data loss)
}

Record and Serialization Errors

package org.apache.kafka.common.errors;

public class SerializationException extends KafkaException {
    // Serialization or deserialization failed
}

public class RecordTooLargeException extends ApiException {
    // Record size exceeds max.request.size (producer) or
    // max.partition.fetch.bytes (consumer)
}

public class InvalidRecordException extends ApiException {
    // Record format is invalid
}

public class CorruptRecordException extends RetriableException {
    // Record is corrupt (CRC check failed)
}

public class RecordBatchTooLargeException extends ApiException {
    // Batch size exceeds configured maximum
}

Topic and Partition Errors

package org.apache.kafka.common.errors;

public class InvalidTopicException extends ApiException {
    // Topic name is invalid
}

public class TopicExistsException extends ApiException {
    // Topic already exists
}

public class InvalidPartitionsException extends ApiException {
    // Partition count is invalid
}

public class InvalidReplicationFactorException extends ApiException {
    // Replication factor is invalid
}

public class TopicDeletionDisabledException extends ApiException {
    // Topic deletion is disabled on broker
}

Configuration Errors

package org.apache.kafka.common.errors;

public class InvalidConfigurationException extends ApiException {
    // Configuration is invalid
}

public class InvalidConfigException extends ApiException {
    // Specific configuration value is invalid
}

public class PolicyViolationException extends ApiException {
    // Operation violates broker policy
}

public class ResourceNotFoundException extends ApiException {
    // Requested resource does not exist
}

Consumer Group Errors

package org.apache.kafka.common.errors;

public class InvalidGroupIdException extends ApiException {
    // Group ID is invalid (empty or null)
}

public class FencedInstanceIdException extends ApiException {
    // Consumer instance ID has been fenced
}

public class MemberIdRequiredException extends ApiException {
    // Member ID is required but not provided
}

public class UnknownMemberIdException extends ApiException {
    // Member ID is not recognized by coordinator
}

public class IllegalGenerationException extends ApiException {
    // Generation ID is invalid
}

public class RebalanceInProgressException extends ApiException {
    // Consumer group rebalance in progress
}

public class InconsistentGroupProtocolException extends ApiException {
    // Group members have incompatible protocols
}

public class GroupIdNotFoundException extends ApiException {
    // Group ID does not exist
}

public class GroupMaxSizeReachedException extends ApiException {
    // Group has reached maximum size
}

Share Group Errors (New in 4.1.1)

package org.apache.kafka.common.errors;

public class InvalidRecordStateException extends ApiException {
    // Record state is invalid for acknowledgement (share groups)
}

public class ShareSessionLimitReachedException extends RetriableException {
    // Maximum share sessions reached on broker
}

Delegation Token Errors

package org.apache.kafka.common.errors;

public class DelegationTokenExpiredException extends ApiException {
    // Delegation token has expired
}

public class DelegationTokenNotFoundException extends ApiException {
    // Delegation token not found
}

public class DelegationTokenDisabledException extends ApiException {
    // Delegation tokens are disabled on broker
}

public class DelegationTokenOwnerMismatchException extends ApiException {
    // Delegation token owner doesn't match
}

Streams Exceptions

All Streams exceptions are non-retriable and typically require application restart or rebalancing.

package org.apache.kafka.streams.errors;

// Base exception
public class StreamsException extends KafkaException {
    // Base class for all Streams exceptions
}

// State store errors
public class InvalidStateStoreException extends StreamsException {
    // State store is not available or invalid
}

public class InvalidStateStorePartitionException extends StreamsException {
    // State store partition is invalid
}

public class LockException extends StreamsException {
    // Failed to acquire state store lock
}

public class StateStoreNotAvailableException extends InvalidStateStoreException {
    // State store is not available (still initializing or closed)
}

// Processing errors
public class TaskMigratedException extends StreamsException {
    // Task has been migrated to another instance
}

public class StreamsNotStartedException extends StreamsException {
    // Streams application has not been started
}

public class StreamsRebalancingException extends StreamsException {
    // Streams application is rebalancing
}

public class StreamsUncaughtExceptionHandler.StreamThreadException extends StreamsException {
    // Uncaught exception in stream thread
}

// Topology errors
public class TopologyException extends StreamsException {
    // Topology definition is invalid
}

// Timestamp errors
public class InvalidTimestampException extends StreamsException {
    // Record timestamp is invalid
}

Connect Exceptions

package org.apache.kafka.connect.errors;

// Base exception
public class ConnectException extends KafkaException {
    // Base class for all Connect exceptions
}

// Data errors
public class DataException extends ConnectException {
    // Data format or schema error
}

public class SchemaBuilderException extends DataException {
    // Error building schema
}

public class SchemaProjectorException extends DataException {
    // Error projecting schema
}

// Retriable wrapper (note: this is Connect-specific, not the common RetriableException)
public class RetriableException extends ConnectException {
    // Operation can be retried
}

// Configuration errors
public class AlreadyExistsException extends ConnectException {
    // Resource already exists
}

public class NotFoundException extends ConnectException {
    // Resource not found
}

Exception Handling Patterns

Producer Error Handling

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("Sent to partition " + metadata.partition());
} catch (ProducerFencedException e) {
    // Fatal: another producer with same transactional.id has newer epoch
    // Must close this producer and create new one
    producer.close();
    throw e;
} catch (OutOfOrderSequenceException | InvalidProducerEpochException e) {
    // Fatal: sequence number or epoch error
    producer.close();
    throw e;
} catch (AuthorizationException e) {
    // Not authorized - check ACLs
    System.err.println("Authorization failed: " + e.getMessage());
} catch (RecordTooLargeException e) {
    // Record too large - cannot retry
    System.err.println("Record too large: " + e.getMessage());
} catch (SerializationException e) {
    // Serialization failed - cannot retry
    System.err.println("Serialization error: " + e.getMessage());
} catch (TimeoutException e) {
    // Retriable: can retry send
    System.err.println("Timeout, will retry: " + e.getMessage());
    // Producer will retry automatically if retries > 0
} catch (Exception e) {
    // Unexpected error
    System.err.println("Unexpected error: " + e.getMessage());
}

Consumer Error Handling

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;
import java.time.Duration;

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    consumer.commitSync();
} catch (CommitFailedException e) {
    // Consumer rebalanced, no longer part of group
    // Need to rejoin group on next poll
    System.err.println("Commit failed due to rebalance: " + e.getMessage());
} catch (WakeupException e) {
    // Consumer wakeup() was called - shutting down
    System.out.println("Consumer wakeup called");
    throw e;
} catch (OffsetOutOfRangeException e) {
    // Offset out of range - seek to valid position
    System.err.println("Offset out of range, seeking to beginning");
    consumer.seekToBeginning(consumer.assignment());
} catch (SerializationException e) {
    // Deserialization error - skip malformed record
    System.err.println("Deserialization error: " + e.getMessage());
    // Continue processing other records
} catch (AuthorizationException e) {
    // Not authorized - fatal error
    System.err.println("Authorization failed: " + e.getMessage());
    throw e;
} catch (Exception e) {
    System.err.println("Unexpected error: " + e.getMessage());
}

Transactional Producer Error Handling

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;

try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // Fatal: another producer has newer epoch
    producer.close();
    throw new RuntimeException("Producer fenced", e);
} catch (OutOfOrderSequenceException | InvalidProducerEpochException e) {
    // Fatal: sequence/epoch error
    producer.close();
    throw new RuntimeException("Fatal producer error", e);
} catch (AuthorizationException e) {
    // Not authorized
    producer.close();
    throw new RuntimeException("Not authorized", e);
} catch (KafkaException e) {
    // Other error - abort transaction
    producer.abortTransaction();
    System.err.println("Transaction aborted: " + e.getMessage());
}

Streams Error Handling

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;

KafkaStreams streams = new KafkaStreams(topology, props);

// Set exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
    if (throwable instanceof StreamsException) {
        System.err.println("Streams exception: " + throwable.getMessage());
        // Return SHUTDOWN_CLIENT to stop this instance
        return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
    }
    // Return REPLACE_THREAD to restart thread
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

try {
    streams.start();
} catch (StreamsException e) {
    System.err.println("Failed to start streams: " + e.getMessage());
    streams.close();
}

Complete Exception Reference

Common Errors (org.apache.kafka.common.errors)

Retriable (36 exceptions):

  • AuthorizerNotReadyException
  • ConcurrentTransactionsException
  • CoordinatorLoadInProgressException
  • CoordinatorNotAvailableException
  • CorruptRecordException
  • DisconnectException
  • ElectionNotNeededException
  • EligibleLeadersNotAvailableException
  • FencedLeaderEpochException
  • FetchSessionIdNotFoundException
  • FetchSessionTopicIdException
  • InconsistentTopicIdException
  • InvalidFetchSessionEpochException
  • InvalidMetadataException
  • InvalidShareSessionEpochException
  • KafkaStorageException
  • LeaderNotAvailableException
  • ListenerNotFoundException
  • NetworkException
  • NotControllerException
  • NotCoordinatorException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • NotLeaderOrFollowerException
  • OffsetNotAvailableException
  • PreferredLeaderNotAvailableException
  • RefreshRetriableException
  • ReplicaNotAvailableException
  • ShareSessionLimitReachedException
  • ShareSessionNotFoundException
  • ThrottlingQuotaExceededException
  • TimeoutException (and subclasses)
  • UnknownLeaderEpochException
  • UnknownTopicIdException
  • UnknownTopicOrPartitionException
  • UnstableOffsetCommitException

Non-Retriable (112 exceptions): Including ApiException, ApplicationRecoverableException, AuthenticationException, AuthorizationException, BrokerIdNotRegisteredException, BrokerNotAvailableException (some variants), ClusterAuthorizationException, CommitFailedException, ConfigException, ControllerMovedException, CoordinatorNotAvailableException, CorruptRecordException, DelegationTokenDisabledException, DelegationTokenExpiredException, DelegationTokenNotFoundException, DelegationTokenOwnerMismatchException, DuplicateBrokerRegistrationException, DuplicateResourceException, DuplicateSequenceException, ElectionDisabledException, FencedInstanceIdException, FencedLeaderEpochException, FetchSessionIdNotFoundException, GroupAuthorizationException, GroupIdNotFoundException, GroupMaxSizeReachedException, GroupNotEmptyException, GroupNotEmptyException, GroupSubscribedToTopicException, IllegalGenerationException, IllegalSaslStateException, IllegalStateException, InconsistentClusterIdException, InconsistentGroupProtocolException, InconsistentTopicIdException, InconsistentVoterSetException, InterruptException, InvalidCommitOffsetSizeException, InvalidConfigException, InvalidConfigurationException, InvalidFetchSizeException, InvalidGroupIdException, InvalidOffsetException, InvalidPartitionsException, InvalidPidMappingException, InvalidPrincipalTypeException, InvalidProducerEpochException, InvalidReplicationFactorException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidRequiredAcksException, InvalidSessionTimeoutException, InvalidTimestampException, InvalidTopicException, InvalidTxnStateException, InvalidTxnTimeoutException, InvalidUpdateVersionException, InvalidVoteException, KafkaPrincipalMappingException, ListOffsetOutOfRangeException, LogDirNotFoundException, MemberIdRequiredException, MismatchedEndpointTypeException, MismatchedGroupProtocolException, NewLeaderElectedException, NoReassignmentInProgressException, OffsetMetadataTooLarge, OffsetMovedToTieredStorageException, OffsetOutOfRangeException, OperationNotAttemptedException, OutOfOrderSequenceException, PolicyViolationException, PrincipalDeserializationException, ProducerFencedException, ReassignmentInProgressException, RebalanceInProgressException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, ResourceNotFoundException, SaslAuthenticationException, SecurityDisabledException, SnapshotNotFoundException, SslAuthenticationException, StaleBrokerEpochException, StaleControllerEpochException, StaleMetadataException, StaleShareSnapshotException, ThrottlingQuotaExceededException, TopicAuthorizationException, TopicDeletionDisabledException, TopicExistsException, TransactionCoordinatorFencedException, TransactionalIdAuthorizationException, TransactionalIdNotFoundException, TransactionAbortedException, UnacceptableCredentialException, UnknownMemberIdException, UnknownProducerIdException, UnknownServerException, UnknownTopicOrPartitionException, UnregularExpression (note: doesn't end in "Exception"), UnsupportedAssignorException, UnsupportedByAuthenticationException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException, UnsupportedSaslMechanismException, UnsupportedVersionException, ValueDesertializationException, and many others.

Consumer Exceptions (org.apache.kafka.clients.consumer)

Retriable:

  • NoAvailableBrokersException
  • RetriableCommitFailedException

Non-Retriable:

  • CommitFailedException
  • InvalidOffsetException
  • LogTruncationException
  • NoOffsetForPartitionException
  • OffsetOutOfRangeException

Producer Exceptions (org.apache.kafka.clients.producer)

Retriable:

  • BufferExhaustedException (extends TimeoutException)

Streams Exceptions (org.apache.kafka.streams.errors)

All Non-Retriable (25 exceptions):

  • BrokerNotFoundException
  • DefaultProductionExceptionHandler
  • DeserializationExceptionHandler
  • InvalidStateStoreException
  • InvalidStateStorePartitionException
  • InvalidTimestampException
  • LockException
  • LogAndContinueExceptionHandler
  • LogAndFailExceptionHandler
  • MissingSourceTopicException
  • ProcessingExceptionHandler
  • ProcessorStateException
  • ProductionExceptionHandler
  • StateStoreNotAvailableException
  • StreamsException (base class)
  • StreamsInvalidTopologyEpochException
  • StreamsNotStartedException
  • StreamsRebalancingException
  • StreamsUncaughtExceptionHandler
  • TaskAssignmentException
  • TaskCorruptedException
  • TaskIdFormatException
  • TaskMigratedException
  • TopologyException
  • UnknownStateStoreException

Connect Exceptions (org.apache.kafka.connect.errors)

All Non-Retriable (8 exceptions):

  • AlreadyExistsException
  • ConnectException (base class)
  • DataException
  • NotFoundException
  • RetriableException (Connect-specific, different from common RetriableException)
  • SchemaBuilderException
  • SchemaProjectorException
  • SchemaSerializationException

Total: 189 Exceptions Documented

Error Handling Decision Tree

For Producers

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;

public class ProducerErrorHandler {
    
    public void handleProducerError(Exception exception, ProducerRecord<?, ?> record) {
        if (exception instanceof RetriableException) {
            // Automatic retry by producer
            System.err.println("Retriable error, producer will retry: " + 
                exception.getMessage());
            // No action needed if retries configured
            
        } else if (exception instanceof ProducerFencedException) {
            // Fatal: Another producer with same transactional.id
            System.err.println("Producer fenced - shutting down");
            // Action: Close producer, shutdown instance
            
        } else if (exception instanceof OutOfOrderSequenceException ||
                   exception instanceof InvalidProducerEpochException) {
            // Fatal: Sequence/epoch error
            System.err.println("Fatal producer error - shutting down");
            // Action: Close producer, create new one
            
        } else if (exception instanceof AuthorizationException) {
            // Fatal: Not authorized
            System.err.println("Authorization error: " + exception.getMessage());
            // Action: Check ACLs, fix permissions
            
        } else if (exception instanceof RecordTooLargeException) {
            // Non-retriable: Record too large
            System.err.println("Record too large: " + record);
            // Action: Split record, increase limits, or drop
            
        } else if (exception instanceof SerializationException) {
            // Non-retriable: Serialization failed
            System.err.println("Serialization error: " + exception.getMessage());
            // Action: Fix data format or serializer
            
        } else if (exception instanceof TimeoutException) {
            // Retriable but may indicate systemic issue
            System.err.println("Timeout: " + exception.getMessage());
            // Action: Check broker health, increase timeouts
            
        } else {
            // Unknown error
            System.err.println("Unexpected error: " + exception.getMessage());
            // Action: Log, alert, investigate
        }
    }
}

For Consumers

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;

public class ConsumerErrorHandler {
    
    public void handleConsumerError(Exception exception, Consumer<?, ?> consumer) {
        if (exception instanceof WakeupException) {
            // Expected during shutdown
            System.out.println("Consumer wakeup - shutting down");
            // Action: Exit poll loop, close consumer
            
        } else if (exception instanceof CommitFailedException) {
            // Consumer rebalanced
            System.err.println("Commit failed - rebalanced");
            // Action: Continue, will rejoin on next poll
            
        } else if (exception instanceof OffsetOutOfRangeException) {
            // Offset no longer valid
            System.err.println("Offset out of range");
            // Action: Seek to beginning/end
            consumer.seekToBeginning(consumer.assignment());
            
        } else if (exception instanceof AuthorizationException) {
            // Fatal: Not authorized
            System.err.println("Authorization error: " + exception.getMessage());
            // Action: Check ACLs, shutdown
            throw new RuntimeException("Not authorized", exception);
            
        } else if (exception instanceof SerializationException) {
            // Non-retriable: Deserialization failed
            System.err.println("Deserialization error: " + exception.getMessage());
            // Action: Skip record, log to DLQ
            
        } else if (exception instanceof RetriableException) {
            // Retriable error
            System.err.println("Retriable error: " + exception.getMessage());
            // Action: Retry poll
            
        } else {
            // Unknown error
            System.err.println("Unexpected error: " + exception.getMessage());
            // Action: Log, alert, potentially shutdown
        }
    }
}

For Streams

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;

public class StreamsErrorHandler {
    
    public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
            handleStreamsError(Throwable exception) {
        
        if (exception instanceof ProducerFencedException) {
            // Another instance took over
            System.err.println("Producer fenced - shutting down client");
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
            
        } else if (exception instanceof InvalidStateStoreException) {
            // Temporary store unavailability
            System.err.println("State store unavailable - replacing thread");
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            
        } else if (exception instanceof TaskMigratedException) {
            // Task migrated to another instance
            System.err.println("Task migrated - replacing thread");
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            
        } else if (exception instanceof StreamsRebalancingException) {
            // Rebalancing in progress
            System.err.println("Rebalancing - replacing thread");
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            
        } else if (exception instanceof TopologyException) {
            // Fatal: Topology error
            System.err.println("Topology error - shutting down");
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
            
        } else {
            // Unknown error - shutdown to be safe
            System.err.println("Unexpected error - shutting down");
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
        }
    }
}

Exception Patterns by Scenario

Network Partition Scenario

// Sequence of exceptions during network partition:

// 1. Initial send attempts
try {
    producer.send(record).get();
} catch (ExecutionException e) {
    // May see: NetworkException, DisconnectException
    // Producer retries automatically
}

// 2. Prolonged partition
// May see: TimeoutException, RequestTimeoutException
// Producer continues retrying up to delivery.timeout.ms

// 3. Partition heals
// Producer reconnects automatically
// Pending sends complete successfully

// 4. If partition exceeds delivery.timeout.ms
// Throws: TimeoutException (non-retriable at this point)
// Action: Log failure, implement application-level retry

Broker Failure Scenario

// Sequence of exceptions during broker failure:

// 1. Broker becomes unavailable
// Producer: BrokerNotAvailableException (retriable)
// Consumer: BrokerNotAvailableException (retriable)

// 2. Leader election in progress
// Producer: LeaderNotAvailableException (retriable)
// Consumer: LeaderNotAvailableException (retriable)

// 3. Metadata refresh
// Producer/Consumer: Automatically refreshes metadata
// Retries with new leader

// 4. New leader ready
// Operations succeed

// Handle in application:
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (exception instanceof BrokerNotAvailableException ||
            exception instanceof LeaderNotAvailableException) {
            System.out.println("Broker failure detected, automatic retry in progress");
            // No action needed - producer handles it
        }
    }
});

Rebalance Scenario

// Consumer group rebalance exception sequence:

// 1. Rebalance triggered (new member joins/leaves)
// Consumer: RebalanceInProgressException on commit (retriable)

// 2. Partitions revoked
// ConsumerRebalanceListener.onPartitionsRevoked() called

// 3. Consumer attempts commit during revocation
try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    // Consumer no longer owns partitions
    // Cannot commit - offsets lost
    System.err.println("Lost uncommitted offsets due to rebalance");
}

// 4. New partitions assigned
// ConsumerRebalanceListener.onPartitionsAssigned() called

// 5. Consumer resumes with new assignment
// Next poll() succeeds with new partitions

// Best practice: Always commit in onPartitionsRevoked
consumer.subscribe(topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        try {
            consumer.commitSync(); // Commit before losing partitions
        } catch (CommitFailedException e) {
            // Already rebalanced - too late to commit
            System.err.println("Rebalance completed before commit");
        }
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Ready to process new partitions
    }
});