tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka provides a comprehensive exception hierarchy for handling errors across all APIs. All Kafka exceptions extend from KafkaException, with specialized sub-hierarchies for retriable and non-retriable errors.
package org.apache.kafka.common;
public class KafkaException extends RuntimeException {
// Base class for all Kafka exceptions
}package org.apache.kafka.common.errors;
// Base exception for API errors
public class ApiException extends KafkaException {
// Base class for all API-related exceptions
}
// Retriable exceptions - operations can be retried
public abstract class RetriableException extends ApiException {
// Operations that failed but can be safely retried
}
// Application-recoverable exceptions - fatal but application can recover
public abstract class ApplicationRecoverableException extends ApiException {
// Fatal errors that require application-level recovery
}// Producer exceptions
package org.apache.kafka.clients.producer;
public class BufferExhaustedException extends TimeoutException {
// Producer buffer exhausted
}
// Consumer exceptions
package org.apache.kafka.clients.consumer;
public class CommitFailedException extends ApiException {
// Commit failed, typically due to rebalance
}
// Streams exceptions
package org.apache.kafka.streams.errors;
public class StreamsException extends KafkaException {
// Base class for all Streams exceptions
}
// Connect exceptions
package org.apache.kafka.connect.errors;
public class ConnectException extends KafkaException {
// Base class for all Connect exceptions
}
// Serialization exceptions
package org.apache.kafka.common.errors;
public class SerializationException extends KafkaException {
// Serialization/deserialization errors
}These exceptions indicate temporary failures. Operations can be safely retried.
package org.apache.kafka.common.errors;
// Network-related retriable errors
public class NetworkException extends RetriableException {
// Network communication error
}
public class DisconnectException extends RetriableException {
// Connection to broker was disconnected
}
public class BrokerNotAvailableException extends RetriableException {
// Broker is not currently available
}
public class LeaderNotAvailableException extends RetriableException {
// Leader for partition is not available
}
public class NotLeaderOrFollowerException extends RetriableException {
// Broker is not the leader or follower for the partition
}
public class NotControllerException extends RetriableException {
// Broker is not the controller
}package org.apache.kafka.common.errors;
public class CoordinatorNotAvailableException extends RetriableException {
// Group coordinator is not available
}
public class NotCoordinatorException extends RetriableException {
// Broker is not the coordinator for the group
}
public class CoordinatorLoadInProgressException extends RetriableException {
// Coordinator is loading group metadata
}package org.apache.kafka.common.errors;
public class NotEnoughReplicasException extends RetriableException {
// Not enough in-sync replicas to satisfy acks requirement
}
public class NotEnoughReplicasAfterAppendException extends RetriableException {
// Record written but not enough in-sync replicas after append
}
public class KafkaStorageException extends RetriableException {
// Disk error on broker
}
public class OffsetNotAvailableException extends RetriableException {
// Offset not yet available
}
public class ReplicaNotAvailableException extends RetriableException {
// Replica not currently available
}package org.apache.kafka.common.errors;
public class TimeoutException extends ApiException {
// Operation timed out
}
public class RequestTimeoutException extends TimeoutException {
// Request to broker timed out
}
// Producer-specific timeout
package org.apache.kafka.clients.producer;
public class BufferExhaustedException extends TimeoutException {
// Producer buffer full, waiting for space timed out
}package org.apache.kafka.common.errors;
public class UnknownTopicOrPartitionException extends RetriableException {
// Topic or partition does not exist (may be created)
}
public class UnknownTopicIdException extends RetriableException {
// Topic ID is not recognized
}
public class InvalidMetadataException extends RetriableException {
// Metadata is invalid and needs refresh
}
public class InconsistentTopicIdException extends RetriableException {
// Topic ID doesn't match expected ID
}package org.apache.kafka.clients.consumer;
public class RetriableCommitFailedException extends RetriableException {
// Offset commit failed but can be retried
}
public class NoAvailableBrokersException extends RetriableException {
// No brokers available to service request
}package org.apache.kafka.common.errors;
public class FetchSessionIdNotFoundException extends RetriableException {
// Fetch session ID not found on broker
}
public class InvalidFetchSessionEpochException extends RetriableException {
// Fetch session epoch is invalid
}
public class ShareSessionNotFoundException extends RetriableException {
// Share session not found on broker (share consumer)
}
public class InvalidShareSessionEpochException extends RetriableException {
// Share session epoch is invalid (share consumer)
}package org.apache.kafka.common.errors;
public class ThrottlingQuotaExceededException extends RetriableException {
// Quota exceeded, request throttled
}These exceptions indicate permanent failures or require corrective action.
package org.apache.kafka.common.errors;
public class AuthenticationException extends ApiException {
// Authentication failed
}
public class SaslAuthenticationException extends AuthenticationException {
// SASL authentication failed
}
public class SslAuthenticationException extends AuthenticationException {
// SSL authentication failed
}
public class AuthorizationException extends ApiException {
// Not authorized to perform operation
}
public class TopicAuthorizationException extends AuthorizationException {
// Not authorized for topic operation
}
public class GroupAuthorizationException extends AuthorizationException {
// Not authorized for group operation
}
public class ClusterAuthorizationException extends AuthorizationException {
// Not authorized for cluster operation
}
public class TransactionalIdAuthorizationException extends AuthorizationException {
// Not authorized for transactional ID
}
public class DelegationTokenAuthorizationException extends AuthorizationException {
// Not authorized for delegation token operation
}package org.apache.kafka.common.errors;
public class ProducerFencedException extends ApplicationRecoverableException {
// Producer with same transactional.id has newer epoch
}
public class OutOfOrderSequenceException extends ApiException {
// Sequence number is out of order
}
public class DuplicateSequenceException extends ApiException {
// Duplicate sequence number detected
}
public class InvalidProducerEpochException extends ApiException {
// Producer epoch is invalid
}
public class InvalidTxnStateException extends ApiException {
// Transaction state is invalid for operation
}
public class TransactionAbortedException extends ApiException {
// Transaction was aborted
}
public class TransactionCoordinatorFencedException extends ApiException {
// Transaction coordinator has been fenced
}
public class InvalidPidMappingException extends ApiException {
// Producer ID mapping is invalid
}
public class UnsupportedForMessageFormatException extends ApiException {
// Feature not supported for this message format version
}package org.apache.kafka.clients.consumer;
public class CommitFailedException extends ApiException {
// Commit failed, typically due to rebalance
// Consumer is no longer part of group
}
public class InvalidOffsetException extends ApiException {
// Offset is invalid for partition
}
public class NoOffsetForPartitionException extends InvalidOffsetException {
// No committed offset and auto.offset.reset=none
}
public class OffsetOutOfRangeException extends ApiException {
// Offset is out of range for partition
// Use seekToBeginning() or seekToEnd()
}
public class LogTruncationException extends ApiException {
// Log was truncated on broker (data loss)
}package org.apache.kafka.common.errors;
public class SerializationException extends KafkaException {
// Serialization or deserialization failed
}
public class RecordTooLargeException extends ApiException {
// Record size exceeds max.request.size (producer) or
// max.partition.fetch.bytes (consumer)
}
public class InvalidRecordException extends ApiException {
// Record format is invalid
}
public class CorruptRecordException extends RetriableException {
// Record is corrupt (CRC check failed)
}
public class RecordBatchTooLargeException extends ApiException {
// Batch size exceeds configured maximum
}package org.apache.kafka.common.errors;
public class InvalidTopicException extends ApiException {
// Topic name is invalid
}
public class TopicExistsException extends ApiException {
// Topic already exists
}
public class InvalidPartitionsException extends ApiException {
// Partition count is invalid
}
public class InvalidReplicationFactorException extends ApiException {
// Replication factor is invalid
}
public class TopicDeletionDisabledException extends ApiException {
// Topic deletion is disabled on broker
}package org.apache.kafka.common.errors;
public class InvalidConfigurationException extends ApiException {
// Configuration is invalid
}
public class InvalidConfigException extends ApiException {
// Specific configuration value is invalid
}
public class PolicyViolationException extends ApiException {
// Operation violates broker policy
}
public class ResourceNotFoundException extends ApiException {
// Requested resource does not exist
}package org.apache.kafka.common.errors;
public class InvalidGroupIdException extends ApiException {
// Group ID is invalid (empty or null)
}
public class FencedInstanceIdException extends ApiException {
// Consumer instance ID has been fenced
}
public class MemberIdRequiredException extends ApiException {
// Member ID is required but not provided
}
public class UnknownMemberIdException extends ApiException {
// Member ID is not recognized by coordinator
}
public class IllegalGenerationException extends ApiException {
// Generation ID is invalid
}
public class RebalanceInProgressException extends ApiException {
// Consumer group rebalance in progress
}
public class InconsistentGroupProtocolException extends ApiException {
// Group members have incompatible protocols
}
public class GroupIdNotFoundException extends ApiException {
// Group ID does not exist
}
public class GroupMaxSizeReachedException extends ApiException {
// Group has reached maximum size
}package org.apache.kafka.common.errors;
public class InvalidRecordStateException extends ApiException {
// Record state is invalid for acknowledgement (share groups)
}
public class ShareSessionLimitReachedException extends RetriableException {
// Maximum share sessions reached on broker
}package org.apache.kafka.common.errors;
public class DelegationTokenExpiredException extends ApiException {
// Delegation token has expired
}
public class DelegationTokenNotFoundException extends ApiException {
// Delegation token not found
}
public class DelegationTokenDisabledException extends ApiException {
// Delegation tokens are disabled on broker
}
public class DelegationTokenOwnerMismatchException extends ApiException {
// Delegation token owner doesn't match
}All Streams exceptions are non-retriable and typically require application restart or rebalancing.
package org.apache.kafka.streams.errors;
// Base exception
public class StreamsException extends KafkaException {
// Base class for all Streams exceptions
}
// State store errors
public class InvalidStateStoreException extends StreamsException {
// State store is not available or invalid
}
public class InvalidStateStorePartitionException extends StreamsException {
// State store partition is invalid
}
public class LockException extends StreamsException {
// Failed to acquire state store lock
}
public class StateStoreNotAvailableException extends InvalidStateStoreException {
// State store is not available (still initializing or closed)
}
// Processing errors
public class TaskMigratedException extends StreamsException {
// Task has been migrated to another instance
}
public class StreamsNotStartedException extends StreamsException {
// Streams application has not been started
}
public class StreamsRebalancingException extends StreamsException {
// Streams application is rebalancing
}
public class StreamsUncaughtExceptionHandler.StreamThreadException extends StreamsException {
// Uncaught exception in stream thread
}
// Topology errors
public class TopologyException extends StreamsException {
// Topology definition is invalid
}
// Timestamp errors
public class InvalidTimestampException extends StreamsException {
// Record timestamp is invalid
}package org.apache.kafka.connect.errors;
// Base exception
public class ConnectException extends KafkaException {
// Base class for all Connect exceptions
}
// Data errors
public class DataException extends ConnectException {
// Data format or schema error
}
public class SchemaBuilderException extends DataException {
// Error building schema
}
public class SchemaProjectorException extends DataException {
// Error projecting schema
}
// Retriable wrapper (note: this is Connect-specific, not the common RetriableException)
public class RetriableException extends ConnectException {
// Operation can be retried
}
// Configuration errors
public class AlreadyExistsException extends ConnectException {
// Resource already exists
}
public class NotFoundException extends ConnectException {
// Resource not found
}import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Sent to partition " + metadata.partition());
} catch (ProducerFencedException e) {
// Fatal: another producer with same transactional.id has newer epoch
// Must close this producer and create new one
producer.close();
throw e;
} catch (OutOfOrderSequenceException | InvalidProducerEpochException e) {
// Fatal: sequence number or epoch error
producer.close();
throw e;
} catch (AuthorizationException e) {
// Not authorized - check ACLs
System.err.println("Authorization failed: " + e.getMessage());
} catch (RecordTooLargeException e) {
// Record too large - cannot retry
System.err.println("Record too large: " + e.getMessage());
} catch (SerializationException e) {
// Serialization failed - cannot retry
System.err.println("Serialization error: " + e.getMessage());
} catch (TimeoutException e) {
// Retriable: can retry send
System.err.println("Timeout, will retry: " + e.getMessage());
// Producer will retry automatically if retries > 0
} catch (Exception e) {
// Unexpected error
System.err.println("Unexpected error: " + e.getMessage());
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;
import java.time.Duration;
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync();
} catch (CommitFailedException e) {
// Consumer rebalanced, no longer part of group
// Need to rejoin group on next poll
System.err.println("Commit failed due to rebalance: " + e.getMessage());
} catch (WakeupException e) {
// Consumer wakeup() was called - shutting down
System.out.println("Consumer wakeup called");
throw e;
} catch (OffsetOutOfRangeException e) {
// Offset out of range - seek to valid position
System.err.println("Offset out of range, seeking to beginning");
consumer.seekToBeginning(consumer.assignment());
} catch (SerializationException e) {
// Deserialization error - skip malformed record
System.err.println("Deserialization error: " + e.getMessage());
// Continue processing other records
} catch (AuthorizationException e) {
// Not authorized - fatal error
System.err.println("Authorization failed: " + e.getMessage());
throw e;
} catch (Exception e) {
System.err.println("Unexpected error: " + e.getMessage());
}import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (ProducerFencedException e) {
// Fatal: another producer has newer epoch
producer.close();
throw new RuntimeException("Producer fenced", e);
} catch (OutOfOrderSequenceException | InvalidProducerEpochException e) {
// Fatal: sequence/epoch error
producer.close();
throw new RuntimeException("Fatal producer error", e);
} catch (AuthorizationException e) {
// Not authorized
producer.close();
throw new RuntimeException("Not authorized", e);
} catch (KafkaException e) {
// Other error - abort transaction
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;
KafkaStreams streams = new KafkaStreams(topology, props);
// Set exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
if (throwable instanceof StreamsException) {
System.err.println("Streams exception: " + throwable.getMessage());
// Return SHUTDOWN_CLIENT to stop this instance
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
}
// Return REPLACE_THREAD to restart thread
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
try {
streams.start();
} catch (StreamsException e) {
System.err.println("Failed to start streams: " + e.getMessage());
streams.close();
}Retriable (36 exceptions):
Non-Retriable (112 exceptions): Including ApiException, ApplicationRecoverableException, AuthenticationException, AuthorizationException, BrokerIdNotRegisteredException, BrokerNotAvailableException (some variants), ClusterAuthorizationException, CommitFailedException, ConfigException, ControllerMovedException, CoordinatorNotAvailableException, CorruptRecordException, DelegationTokenDisabledException, DelegationTokenExpiredException, DelegationTokenNotFoundException, DelegationTokenOwnerMismatchException, DuplicateBrokerRegistrationException, DuplicateResourceException, DuplicateSequenceException, ElectionDisabledException, FencedInstanceIdException, FencedLeaderEpochException, FetchSessionIdNotFoundException, GroupAuthorizationException, GroupIdNotFoundException, GroupMaxSizeReachedException, GroupNotEmptyException, GroupNotEmptyException, GroupSubscribedToTopicException, IllegalGenerationException, IllegalSaslStateException, IllegalStateException, InconsistentClusterIdException, InconsistentGroupProtocolException, InconsistentTopicIdException, InconsistentVoterSetException, InterruptException, InvalidCommitOffsetSizeException, InvalidConfigException, InvalidConfigurationException, InvalidFetchSizeException, InvalidGroupIdException, InvalidOffsetException, InvalidPartitionsException, InvalidPidMappingException, InvalidPrincipalTypeException, InvalidProducerEpochException, InvalidReplicationFactorException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidRequiredAcksException, InvalidSessionTimeoutException, InvalidTimestampException, InvalidTopicException, InvalidTxnStateException, InvalidTxnTimeoutException, InvalidUpdateVersionException, InvalidVoteException, KafkaPrincipalMappingException, ListOffsetOutOfRangeException, LogDirNotFoundException, MemberIdRequiredException, MismatchedEndpointTypeException, MismatchedGroupProtocolException, NewLeaderElectedException, NoReassignmentInProgressException, OffsetMetadataTooLarge, OffsetMovedToTieredStorageException, OffsetOutOfRangeException, OperationNotAttemptedException, OutOfOrderSequenceException, PolicyViolationException, PrincipalDeserializationException, ProducerFencedException, ReassignmentInProgressException, RebalanceInProgressException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, ResourceNotFoundException, SaslAuthenticationException, SecurityDisabledException, SnapshotNotFoundException, SslAuthenticationException, StaleBrokerEpochException, StaleControllerEpochException, StaleMetadataException, StaleShareSnapshotException, ThrottlingQuotaExceededException, TopicAuthorizationException, TopicDeletionDisabledException, TopicExistsException, TransactionCoordinatorFencedException, TransactionalIdAuthorizationException, TransactionalIdNotFoundException, TransactionAbortedException, UnacceptableCredentialException, UnknownMemberIdException, UnknownProducerIdException, UnknownServerException, UnknownTopicOrPartitionException, UnregularExpression (note: doesn't end in "Exception"), UnsupportedAssignorException, UnsupportedByAuthenticationException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException, UnsupportedSaslMechanismException, UnsupportedVersionException, ValueDesertializationException, and many others.
Retriable:
Non-Retriable:
Retriable:
All Non-Retriable (25 exceptions):
All Non-Retriable (8 exceptions):
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;
public class ProducerErrorHandler {
public void handleProducerError(Exception exception, ProducerRecord<?, ?> record) {
if (exception instanceof RetriableException) {
// Automatic retry by producer
System.err.println("Retriable error, producer will retry: " +
exception.getMessage());
// No action needed if retries configured
} else if (exception instanceof ProducerFencedException) {
// Fatal: Another producer with same transactional.id
System.err.println("Producer fenced - shutting down");
// Action: Close producer, shutdown instance
} else if (exception instanceof OutOfOrderSequenceException ||
exception instanceof InvalidProducerEpochException) {
// Fatal: Sequence/epoch error
System.err.println("Fatal producer error - shutting down");
// Action: Close producer, create new one
} else if (exception instanceof AuthorizationException) {
// Fatal: Not authorized
System.err.println("Authorization error: " + exception.getMessage());
// Action: Check ACLs, fix permissions
} else if (exception instanceof RecordTooLargeException) {
// Non-retriable: Record too large
System.err.println("Record too large: " + record);
// Action: Split record, increase limits, or drop
} else if (exception instanceof SerializationException) {
// Non-retriable: Serialization failed
System.err.println("Serialization error: " + exception.getMessage());
// Action: Fix data format or serializer
} else if (exception instanceof TimeoutException) {
// Retriable but may indicate systemic issue
System.err.println("Timeout: " + exception.getMessage());
// Action: Check broker health, increase timeouts
} else {
// Unknown error
System.err.println("Unexpected error: " + exception.getMessage());
// Action: Log, alert, investigate
}
}
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;
public class ConsumerErrorHandler {
public void handleConsumerError(Exception exception, Consumer<?, ?> consumer) {
if (exception instanceof WakeupException) {
// Expected during shutdown
System.out.println("Consumer wakeup - shutting down");
// Action: Exit poll loop, close consumer
} else if (exception instanceof CommitFailedException) {
// Consumer rebalanced
System.err.println("Commit failed - rebalanced");
// Action: Continue, will rejoin on next poll
} else if (exception instanceof OffsetOutOfRangeException) {
// Offset no longer valid
System.err.println("Offset out of range");
// Action: Seek to beginning/end
consumer.seekToBeginning(consumer.assignment());
} else if (exception instanceof AuthorizationException) {
// Fatal: Not authorized
System.err.println("Authorization error: " + exception.getMessage());
// Action: Check ACLs, shutdown
throw new RuntimeException("Not authorized", exception);
} else if (exception instanceof SerializationException) {
// Non-retriable: Deserialization failed
System.err.println("Deserialization error: " + exception.getMessage());
// Action: Skip record, log to DLQ
} else if (exception instanceof RetriableException) {
// Retriable error
System.err.println("Retriable error: " + exception.getMessage());
// Action: Retry poll
} else {
// Unknown error
System.err.println("Unexpected error: " + exception.getMessage());
// Action: Log, alert, potentially shutdown
}
}
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;
public class StreamsErrorHandler {
public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
handleStreamsError(Throwable exception) {
if (exception instanceof ProducerFencedException) {
// Another instance took over
System.err.println("Producer fenced - shutting down client");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
} else if (exception instanceof InvalidStateStoreException) {
// Temporary store unavailability
System.err.println("State store unavailable - replacing thread");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
} else if (exception instanceof TaskMigratedException) {
// Task migrated to another instance
System.err.println("Task migrated - replacing thread");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
} else if (exception instanceof StreamsRebalancingException) {
// Rebalancing in progress
System.err.println("Rebalancing - replacing thread");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
} else if (exception instanceof TopologyException) {
// Fatal: Topology error
System.err.println("Topology error - shutting down");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
} else {
// Unknown error - shutdown to be safe
System.err.println("Unexpected error - shutting down");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
}
}// Sequence of exceptions during network partition:
// 1. Initial send attempts
try {
producer.send(record).get();
} catch (ExecutionException e) {
// May see: NetworkException, DisconnectException
// Producer retries automatically
}
// 2. Prolonged partition
// May see: TimeoutException, RequestTimeoutException
// Producer continues retrying up to delivery.timeout.ms
// 3. Partition heals
// Producer reconnects automatically
// Pending sends complete successfully
// 4. If partition exceeds delivery.timeout.ms
// Throws: TimeoutException (non-retriable at this point)
// Action: Log failure, implement application-level retry// Sequence of exceptions during broker failure:
// 1. Broker becomes unavailable
// Producer: BrokerNotAvailableException (retriable)
// Consumer: BrokerNotAvailableException (retriable)
// 2. Leader election in progress
// Producer: LeaderNotAvailableException (retriable)
// Consumer: LeaderNotAvailableException (retriable)
// 3. Metadata refresh
// Producer/Consumer: Automatically refreshes metadata
// Retries with new leader
// 4. New leader ready
// Operations succeed
// Handle in application:
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof BrokerNotAvailableException ||
exception instanceof LeaderNotAvailableException) {
System.out.println("Broker failure detected, automatic retry in progress");
// No action needed - producer handles it
}
}
});// Consumer group rebalance exception sequence:
// 1. Rebalance triggered (new member joins/leaves)
// Consumer: RebalanceInProgressException on commit (retriable)
// 2. Partitions revoked
// ConsumerRebalanceListener.onPartitionsRevoked() called
// 3. Consumer attempts commit during revocation
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// Consumer no longer owns partitions
// Cannot commit - offsets lost
System.err.println("Lost uncommitted offsets due to rebalance");
}
// 4. New partitions assigned
// ConsumerRebalanceListener.onPartitionsAssigned() called
// 5. Consumer resumes with new assignment
// Next poll() succeeds with new partitions
// Best practice: Always commit in onPartitionsRevoked
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
consumer.commitSync(); // Commit before losing partitions
} catch (CommitFailedException e) {
// Already rebalanced - too late to commit
System.err.println("Rebalance completed before commit");
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Ready to process new partitions
}
});