tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
The Consumer API subscribes to topics and processes streams of records. Consumers are not thread-safe and should be used by a single thread.
Main consumer interface for reading records from Kafka.
package org.apache.kafka.clients.consumer;
public interface Consumer<K, V> extends Closeable {
// Subscription
void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
void subscribe(SubscriptionPattern pattern);
void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);
void assign(Collection<TopicPartition> partitions);
void unsubscribe();
// Polling
ConsumerRecords<K, V> poll(Duration timeout);
// Offset management - Synchronous
void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout);
// Offset management - Asynchronous
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback);
// Position management
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
long position(TopicPartition partition);
long position(TopicPartition partition, Duration timeout);
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions,
Duration timeout);
// Flow control
void pause(Collection<TopicPartition> partitions);
void resume(Collection<TopicPartition> partitions);
Set<TopicPartition> paused();
// Metadata
Set<TopicPartition> assignment();
Set<String> subscription();
List<PartitionInfo> partitionsFor(String topic);
List<PartitionInfo> partitionsFor(String topic, Duration timeout);
Map<String, List<PartitionInfo>> listTopics();
Map<String, List<PartitionInfo>> listTopics(Duration timeout);
Map<MetricName, ? extends Metric> metrics();
Uuid clientInstanceId(Duration timeout);
// Metric subscription (for monitoring)
void registerMetricForSubscription(KafkaMetric metric);
void unregisterMetricFromSubscription(KafkaMetric metric);
// Offset queries
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch);
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
Duration timeout);
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
Duration timeout);
OptionalLong currentLag(TopicPartition topicPartition);
// Group management
ConsumerGroupMetadata groupMetadata();
void enforceRebalance();
void enforceRebalance(String reason);
// Lifecycle
void close();
void close(Duration timeout);
void close(CloseOptions options);
void wakeup();
}Thread-unsafe implementation of the Consumer interface. Each consumer instance should be used by only one thread.
package org.apache.kafka.clients.consumer;
public class KafkaConsumer<K, V> implements Consumer<K, V> {
// Constructor with Properties
public KafkaConsumer(Properties properties);
public KafkaConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer);
// Constructor with Map
public KafkaConsumer(Map<String, Object> configs);
public KafkaConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer);
// All Consumer interface methods implemented
}Share consumer interface for cooperative consumption using share groups. Share groups allow multiple consumers to cooperatively consume from the same partitions, with records distributed across consumers rather than partitions. This is a preview feature (@InterfaceStability.Evolving).
Key Differences from Consumer Groups:
package org.apache.kafka.clients.consumer;
public interface ShareConsumer<K, V> extends Closeable {
// Subscription
Set<String> subscription();
void subscribe(Collection<String> topics);
void unsubscribe();
// Polling
ConsumerRecords<K, V> poll(Duration timeout);
// Acknowledgement (explicit mode only)
void acknowledge(ConsumerRecord<K, V> record);
void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);
// Commit acknowledgements
Map<TopicIdPartition, Optional<KafkaException>> commitSync();
Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);
void commitAsync();
void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback);
// Metrics and monitoring
Uuid clientInstanceId(Duration timeout);
Map<MetricName, ? extends Metric> metrics();
void registerMetricForSubscription(KafkaMetric metric);
void unregisterMetricFromSubscription(KafkaMetric metric);
// Lifecycle
void close();
void close(Duration timeout);
void wakeup();
}Implementation of ShareConsumer interface.
package org.apache.kafka.clients.consumer;
public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
public KafkaShareConsumer(Map<String, Object> configs);
public KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer);
public KafkaShareConsumer(Properties properties);
public KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer);
}A record received from Kafka.
package org.apache.kafka.clients.consumer;
public class ConsumerRecord<K, V> {
// Constants
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
// Accessors
public String topic();
public int partition();
public long offset();
public long timestamp();
public TimestampType timestampType();
public int serializedKeySize();
public int serializedValueSize();
public K key();
public V value();
public Headers headers();
public Optional<Integer> leaderEpoch();
public Optional<Integer> deliveryCount();
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
// Access timestamp
if (record.timestamp() != ConsumerRecord.NO_TIMESTAMP) {
System.out.println("Timestamp: " + record.timestamp());
System.out.println("Timestamp type: " + record.timestampType());
}
// Access headers
for (Header header : record.headers()) {
System.out.println("Header: " + header.key() + " = " +
new String(header.value()));
}
// Check delivery count (for share consumers)
record.deliveryCount().ifPresent(count ->
System.out.println("Delivery count: " + count));
}Collection of records from a poll operation.
package org.apache.kafka.clients.consumer;
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
// Empty constant
public static final ConsumerRecords<Object, Object> EMPTY;
// Access records
public List<ConsumerRecord<K, V>> records(TopicPartition partition);
public Iterable<ConsumerRecord<K, V>> records(String topic);
public Set<TopicPartition> partitions();
public Iterator<ConsumerRecord<K, V>> iterator();
// Metadata
public int count();
public boolean isEmpty();
public Map<TopicPartition, Long> nextOffsets();
}Usage Example:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process all records
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Process by partition
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
System.out.println("Partition " + partition + " has " +
partitionRecords.size() + " records");
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
}
// Process by topic
for (ConsumerRecord<String, String> record : records.records("my-topic")) {
processRecord(record);
}
// Check if empty
if (records.isEmpty()) {
System.out.println("No records received");
} else {
System.out.println("Received " + records.count() + " records");
}Offset position with optional metadata.
package org.apache.kafka.clients.consumer;
public class OffsetAndMetadata implements Serializable {
// Constructors
public OffsetAndMetadata(long offset);
public OffsetAndMetadata(long offset, String metadata);
public OffsetAndMetadata(long offset, Optional<Integer> leaderEpoch, String metadata);
// Accessors
public long offset();
public String metadata();
public Optional<Integer> leaderEpoch();
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
// Manual offset commit with metadata
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
TopicPartition partition = new TopicPartition("my-topic", 0);
offsets.put(partition, new OffsetAndMetadata(100L, "Processed batch 1"));
consumer.commitSync(offsets);
// Retrieve committed offsets
Map<TopicPartition, OffsetAndMetadata> committed =
consumer.committed(Collections.singleton(partition));
OffsetAndMetadata offsetMeta = committed.get(partition);
System.out.println("Committed offset: " + offsetMeta.offset());
System.out.println("Metadata: " + offsetMeta.metadata());Offset and timestamp pair for time-based queries.
package org.apache.kafka.clients.consumer;
public class OffsetAndTimestamp {
public long offset();
public long timestamp();
public Optional<Integer> leaderEpoch();
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
// Find offsets for specific timestamps
TopicPartition partition = new TopicPartition("my-topic", 0);
long timestamp = System.currentTimeMillis() - 3600000; // 1 hour ago
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(partition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes =
consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(partition);
if (offsetAndTimestamp != null) {
System.out.println("Offset at timestamp " + timestamp + ": " +
offsetAndTimestamp.offset());
consumer.seek(partition, offsetAndTimestamp.offset());
}Consumer group metadata for transactional sends.
package org.apache.kafka.clients.consumer;
public class ConsumerGroupMetadata {
public String groupId();
public int generationId();
public String memberId();
public Optional<String> groupInstanceId();
}Usage Example:
// Get consumer group metadata for transactional producer
ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
// Send to transactional producer
producer.sendOffsetsToTransaction(offsets, groupMetadata);Callback for partition rebalance events.
package org.apache.kafka.clients.consumer;
public interface ConsumerRebalanceListener {
/**
* Called before partitions are revoked from this consumer.
*/
void onPartitionsRevoked(Collection<TopicPartition> partitions);
/**
* Called after partitions are assigned to this consumer.
*/
void onPartitionsAssigned(Collection<TopicPartition> partitions);
/**
* Called when partitions are lost (optional, default no-op).
*/
default void onPartitionsLost(Collection<TopicPartition> partitions) {
onPartitionsRevoked(partitions);
}
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit offsets before rebalance
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
// Initialize state for new partitions
for (TopicPartition partition : partitions) {
// Optionally seek to specific offset
// consumer.seek(partition, 0L);
}
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
System.out.println("Partitions lost: " + partitions);
// Partitions lost due to consumer group rebalance failure
// No commit is possible
}
});Callback for asynchronous offset commits.
package org.apache.kafka.clients.consumer;
@FunctionalInterface
public interface OffsetCommitCallback {
/**
* Called when offset commit completes.
* @param offsets The committed offsets
* @param exception The exception, or null if successful
*/
void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception);
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import java.util.Map;
// Async commit with callback
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
System.err.println("Commit failed: " + exception.getMessage());
} else {
System.out.println("Commit succeeded for offsets: " + offsets);
}
}
});
// Lambda syntax
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception.getMessage());
}
});Callback for asynchronous acknowledgement commits in share consumers.
package org.apache.kafka.clients.consumer;
@FunctionalInterface
public interface AcknowledgementCommitCallback {
/**
* Called when acknowledgement commit completes.
* @param offsets Map of topic-partition to acknowledged record offsets
* @param exception The exception, or null if successful
*/
void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception);
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicIdPartition;
import java.util.*;
// Set acknowledgement commit callback for share consumer
shareConsumer.setAcknowledgementCommitCallback(
new AcknowledgementCommitCallback() {
@Override
public void onComplete(Map<TopicIdPartition, Set<Long>> offsets,
Exception exception) {
if (exception != null) {
System.err.println("Acknowledgement commit failed: " +
exception.getMessage());
} else {
System.out.println("Acknowledgements committed for " +
offsets.size() + " partitions");
}
}
});
// Lambda syntax
shareConsumer.setAcknowledgementCommitCallback((offsets, exception) -> {
if (exception != null) {
System.err.println("Acknowledgement commit failed: " + exception.getMessage());
}
});Exception Types:
AuthorizationException - Not authorized to the topic or share groupInvalidRecordStateException - Record state is invalid for acknowledgementNotLeaderOrFollowerException - Leader changed before acknowledgements were sentDisconnectException - Broker disconnected before request completedWakeupException - Consumer wakeup() called during callbackInterruptException - Thread interrupted during callbackKafkaException - Other unrecoverable errorsIntercept records before processing.
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Configurable;
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
* Called before records are returned to the application.
*/
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
* Called when offsets are committed.
*/
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
void close();
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import java.util.*;
public class LoggingInterceptor implements ConsumerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("Received " + records.count() + " records");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("Committed offsets: " + offsets);
}
@Override
public void close() {}
}
// Configure interceptor
Properties props = new Properties();
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
LoggingInterceptor.class.getName());Custom partition assignment strategy.
package org.apache.kafka.clients.consumer;
public interface ConsumerPartitionAssignor {
String name();
short version();
List<RebalanceProtocol> supportedProtocols();
ByteBuffer subscriptionUserData(Set<String> topics);
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata);
// Nested classes
class Subscription {
public List<String> topics();
public ByteBuffer userData();
public List<TopicPartition> ownedPartitions();
public Optional<String> groupInstanceId();
public Optional<String> rackId();
}
class Assignment {
public List<TopicPartition> partitions();
public ByteBuffer userData();
}
class GroupSubscription {
public Map<String, Subscription> groupSubscription();
}
class GroupAssignment {
public Map<String, Assignment> groupAssignment();
}
enum RebalanceProtocol {
EAGER,
COOPERATIVE
}
}RangeAssignor:
package org.apache.kafka.clients.consumer;
public class RangeAssignor implements ConsumerPartitionAssignor {
// Range-based assignment
// Name: "range"
// Assigns partitions on a per-topic basis
}RoundRobinAssignor:
package org.apache.kafka.clients.consumer;
public class RoundRobinAssignor implements ConsumerPartitionAssignor {
// Round-robin assignment
// Name: "roundrobin"
// Assigns partitions evenly across consumers
}StickyAssignor:
package org.apache.kafka.clients.consumer;
public class StickyAssignor implements ConsumerPartitionAssignor {
// Sticky assignment
// Name: "sticky"
// Minimizes partition movement during rebalance
}CooperativeStickyAssignor:
package org.apache.kafka.clients.consumer;
public class CooperativeStickyAssignor implements ConsumerPartitionAssignor {
// Cooperative sticky assignment
// Name: "cooperative-sticky"
// Allows incremental rebalancing without stopping consumption
}Usage:
Properties props = new Properties();
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Or multiple strategies
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Arrays.asList(
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
"org.apache.kafka.clients.consumer.StickyAssignor"
));Configuration constants for the consumer.
package org.apache.kafka.clients.consumer;
public class ConsumerConfig extends AbstractConfig {
// Required configurations
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
// Group configuration
public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
// Offset configuration
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
// Assignment strategy
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG =
"partition.assignment.strategy";
// Fetch configuration
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG =
"max.partition.fetch.bytes";
// Timeouts
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
// Metadata
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG =
"allow.auto.create.topics";
// Isolation and reliability
public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
public static final String CHECK_CRCS_CONFIG = "check.crcs";
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
// Client identification
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String CLIENT_RACK_CONFIG = "client.rack";
// Interceptors
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
}Configuration for share consumers. Extends ConsumerConfig but excludes configurations that are not supported for share consumers.
package org.apache.kafka.clients.consumer;
public class ShareConsumerConfig extends ConsumerConfig {
// Share-specific configuration
public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG =
"share.acknowledgement.mode";
// Values: "implicit" (default), "explicit"
// Inherited from ConsumerConfig but unsupported (will throw exception):
// - auto.offset.reset
// - enable.auto.commit
// - group.instance.id
// - isolation.level
// - partition.assignment.strategy
// - interceptor.classes
// - session.timeout.ms
// - heartbeat.interval.ms
// - group.protocol
// - group.remote.assignor
}Configuration Properties:
| Property | Type | Default | Description |
|---|---|---|---|
share.acknowledgement.mode | String | implicit | Acknowledgement mode: implicit (auto-acknowledge on next poll) or explicit (must call acknowledge()) |
Broker-Side Share Group Configuration:
These are configured at the broker level:
| Property | Type | Default | Description |
|---|---|---|---|
share.session.timeout.ms | int | 45000 (45 sec) | Share group session timeout |
share.heartbeat.interval.ms | int | 3000 (3 sec) | Share group heartbeat interval |
share.record.lock.duration.ms | int | 30000 (30 sec) | Duration records are locked when delivered |
share.auto.offset.reset | String | latest | Initial offset strategy: earliest, latest, by_duration:<ISO8601> |
share.isolation.level | String | read_uncommitted | Transaction isolation: read_committed or read_uncommitted |
group.share.record.lock.partition.limit | int | 200 | Max acquired records per partition per consumer |
Basic Configuration:
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);Manual Offset Management:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Disable auto-commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(props);Read Committed (Transactional):
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Read only committed messages
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
Consumer<String, String> consumer = new KafkaConsumer<>(props);Configuration Defaults:
group.protocol: "classic"session.timeout.ms: 45000 (45 seconds)heartbeat.interval.ms: 3000 (3 seconds)max.poll.interval.ms: 300000 (5 minutes)max.poll.records: 500enable.auto.commit: trueauto.commit.interval.ms: 5000 (5 seconds)auto.offset.reset: "latest" (or "earliest", "by_duration:<duration>", "none")partition.assignment.strategy: [RangeAssignor, CooperativeStickyAssignor]fetch.min.bytes: 1fetch.max.bytes: 52428800 (50MB)fetch.max.wait.ms: 500max.partition.fetch.bytes: 1048576 (1MB)request.timeout.ms: 30000 (30 seconds)default.api.timeout.ms: 60000 (1 minute)metadata.max.age.ms: 300000 (5 minutes)allow.auto.create.topics: trueisolation.level: "read_uncommitted"check.crcs: trueexclude.internal.topics: truepackage org.apache.kafka.clients.consumer;
public enum GroupProtocol {
CLASSIC, // Traditional consumer group protocol
CONSUMER // New consumer group protocol (KIP-848)
}package org.apache.kafka.clients.consumer;
public enum AcknowledgeType {
ACCEPT(1), // Successfully processed
RELEASE(2), // Requeue for later delivery
REJECT(3); // Permanently reject
public byte id();
}Options for closing the consumer.
package org.apache.kafka.clients.consumer;
public class CloseOptions {
public enum GroupMembershipOperation {
LEAVE_GROUP, // Leave consumer group
REMAIN_IN_GROUP, // Stay in group
DEFAULT // Default behavior
}
// Factory methods
public static CloseOptions timeout(Duration timeout);
public static CloseOptions groupMembershipOperation(GroupMembershipOperation operation);
// Fluent setters
public CloseOptions withTimeout(Duration timeout);
public CloseOptions withGroupMembershipOperation(GroupMembershipOperation operation);
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
// Close with timeout
consumer.close(CloseOptions.timeout(Duration.ofSeconds(10)));
// Close but remain in group
consumer.close(CloseOptions.groupMembershipOperation(
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
// Combine options
CloseOptions options = CloseOptions.timeout(Duration.ofSeconds(10))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
consumer.close(options);RE2/J compatible pattern for topic subscription.
package org.apache.kafka.clients.consumer;
public class SubscriptionPattern {
public SubscriptionPattern(String pattern);
}Mock consumer for testing.
package org.apache.kafka.clients.consumer;
public class MockConsumer<K, V> implements Consumer<K, V> {
public MockConsumer(OffsetResetStrategy offsetResetStrategy);
// Test helpers
public void addRecord(ConsumerRecord<K, V> record);
public void addEndOffsets(Map<TopicPartition, Long> newOffsets);
public void addBeginningOffsets(Map<TopicPartition, Long> newOffsets);
public void setException(KafkaException exception);
public void updatePartitions(String topic, List<PartitionInfo> partitions);
public void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets);
public void updateEndOffsets(Map<TopicPartition, Long> newOffsets);
public void rebalance(Collection<TopicPartition> newAssignment);
// Verification
public boolean closed();
public Map<String, List<PartitionInfo>> partitions();
public Set<TopicPartition> paused();
}Usage Example:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
// Create mock consumer
MockConsumer<String, String> mockConsumer =
new MockConsumer<>(OffsetResetStrategy.EARLIEST);
// Configure partitions
TopicPartition partition = new TopicPartition("test-topic", 0);
mockConsumer.assign(Collections.singletonList(partition));
// Add beginning offsets
Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(partition, 0L);
mockConsumer.updateBeginningOffsets(beginningOffsets);
// Add test records
mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 0L, "key1", "value1"));
mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 1L, "key2", "value2"));
// Test consumer code
ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(100));
assertEquals(2, records.count());Mock share consumer for testing.
package org.apache.kafka.clients.consumer;
public class MockShareConsumer<K, V> implements ShareConsumer<K, V> {
// Similar test helpers as MockConsumer
}Unrecoverable commit failure.
package org.apache.kafka.clients.consumer;
public class CommitFailedException extends ApiException {
// Consumer group rebalanced before commit completed
// Consumer is no longer part of the group
}Retriable commit failure.
package org.apache.kafka.clients.consumer;
public class RetriableCommitFailedException extends RetriableException {
// Commit failed but can be retried
}Invalid offset.
package org.apache.kafka.clients.consumer;
public class InvalidOffsetException extends ApiException {
// Offset is invalid for the partition
}No offset exists for the partition.
package org.apache.kafka.clients.consumer;
public class NoOffsetForPartitionException extends InvalidOffsetException {
// No committed offset and auto.offset.reset=none
}Offset out of range.
package org.apache.kafka.clients.consumer;
public class OffsetOutOfRangeException extends RetriableException {
// Requested offset is out of range for the partition
// Check beginning/end offsets and seek accordingly
}Log truncation detected.
package org.apache.kafka.clients.consumer;
public class LogTruncationException extends ApiException {
// Log was truncated on the broker
// Data loss detected
}import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Processing: offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
// Process record
}
// Commit offsets synchronously after processing
consumer.commitSync();
}
} catch (CommitFailedException e) {
System.err.println("Commit failed: " + e.getMessage());
} finally {
consumer.close();
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// Manually assign partitions (no consumer group)
TopicPartition partition0 = new TopicPartition("my-topic", 0);
TopicPartition partition1 = new TopicPartition("my-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));
// Seek to specific offset
consumer.seek(partition0, 100L);
consumer.seekToBeginning(Collections.singletonList(partition1));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records partition by partition
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
System.out.println("Processing " + partitionRecords.size() +
" records from partition " + partition.partition());
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// Commit offset for this partition
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offsets);
}
}import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.regex.Pattern;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to all topics matching pattern
Pattern pattern = Pattern.compile("metrics-.*");
consumer.subscribe(pattern);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Topic: " + record.topic() + ", Value: " + record.value());
}
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
// Poll once to get partition assignment
consumer.poll(Duration.ofMillis(0));
// Get assigned partitions
Set<TopicPartition> assignment = consumer.assignment();
// Seek to timestamp (e.g., 1 hour ago)
long timestamp = System.currentTimeMillis() - 3600000;
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : assignment) {
timestampsToSearch.put(partition, timestamp);
}
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes =
consumer.offsetsForTimes(timestampsToSearch);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}
// Continue consuming from the seeked position
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Set<TopicPartition> assignment = consumer.assignment();
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (BackpressureException e) {
// Pause consumption due to backpressure
System.out.println("Pausing consumption due to backpressure");
consumer.pause(assignment);
// Wait and resume
Thread.sleep(5000);
consumer.resume(assignment);
System.out.println("Resuming consumption");
}
}
}import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
// Configure share consumer with implicit acknowledgement (default)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Implicit mode: records are auto-acknowledged on next poll or commitSync
props.put("share.acknowledgement.mode", "implicit");
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
// No explicit acknowledge needed in implicit mode
} catch (Exception e) {
System.err.println("Processing failed: " + e.getMessage());
// Record will be auto-acknowledged anyway in implicit mode
}
}
// Records are acknowledged on next poll or explicit commitSync
}
} finally {
shareConsumer.close();
}import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
// Configure share consumer with explicit acknowledgement
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Explicit mode: must call acknowledge() for each record
props.put("share.acknowledgement.mode", "explicit");
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
// ACCEPT: Successfully processed
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (RetriableException e) {
// RELEASE: Temporary failure, requeue for retry
System.err.println("Temporary failure, releasing: " + e.getMessage());
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
} catch (PermanentException e) {
// REJECT: Permanent failure, send to DLQ or discard
System.err.println("Permanent failure, rejecting: " + e.getMessage());
shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
}
}
}
} finally {
shareConsumer.close();
}import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Check delivery count for dead-letter pattern
Optional<Short> deliveryCount = record.deliveryCount();
if (deliveryCount.isPresent() && deliveryCount.get() > 3) {
// Send to dead-letter queue after 3 retries
System.err.println("Max retries exceeded, sending to DLQ: " +
record.key());
sendToDeadLetterQueue(record);
shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
} else {
try {
processRecord(record);
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
System.err.println("Processing failed (attempt " +
deliveryCount.orElse((short) 1) + "): " +
e.getMessage());
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
}
}
}
}
} finally {
shareConsumer.close();
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicIdPartition;
import java.time.Duration;
import java.util.*;
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
// Set acknowledgement commit callback
shareConsumer.setAcknowledgementCommitCallback((offsets, exception) -> {
if (exception != null) {
System.err.println("Acknowledgement commit failed: " + exception.getMessage());
// Handle commit failure (e.g., retry logic)
} else {
System.out.println("Successfully committed acknowledgements for " +
offsets.size() + " partitions");
}
});
shareConsumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
}
// Commit asynchronously - callback will be invoked
shareConsumer.commitAsync();
}
} finally {
shareConsumer.close();
}WARNING: KafkaConsumer is NOT thread-safe. It should be used by only one thread. To consume from multiple threads, use multiple consumer instances.
Note: ShareConsumer is also NOT thread-safe and should be used by only one thread. However, share groups naturally support multiple consumers from the same partition, so use multiple ShareConsumer instances across threads/processes for parallel consumption.
Multi-threaded Pattern:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
// Create multiple consumer instances, one per thread
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int threadId = i;
executor.submit(() -> {
// Each thread has its own consumer instance
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-" + threadId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Thread " + threadId + " processing: " +
record.value());
}
}
} finally {
consumer.close();
}
});
}Wakeup Pattern:
The only thread-safe method is wakeup(), which can be called from another thread to interrupt a blocked poll():
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down consumer");
consumer.wakeup(); // Thread-safe call
}));
try {
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}
} catch (WakeupException e) {
// Expected on shutdown
System.out.println("Consumer woken up");
} finally {
consumer.close();
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class GracefulConsumer {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Consumer<String, String> consumer;
public GracefulConsumer(Consumer<String, String> consumer) {
this.consumer = consumer;
}
public void run() {
try {
consumer.subscribe(Arrays.asList("my-topic"));
while (!closed.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitAsync();
}
} catch (WakeupException e) {
// Ignore for shutdown
} finally {
try {
consumer.commitSync(); // Final commit
} finally {
consumer.close();
}
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;
import java.time.Duration;
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync();
} catch (CommitFailedException e) {
// Consumer rebalanced, need to rejoin
System.err.println("Commit failed, rebalancing: " + e.getMessage());
} catch (OffsetOutOfRangeException e) {
// Seek to valid offset
consumer.seekToBeginning(consumer.assignment());
} catch (SerializationException e) {
// Skip malformed record
System.err.println("Deserialization error: " + e.getMessage());
} catch (Exception e) {
System.err.println("Unexpected error: " + e.getMessage());
}
}import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
if (name.name().equals("records-lag-max")) {
System.out.println("Max lag: " + entry.getValue().metricValue());
}
if (name.name().equals("fetch-rate")) {
System.out.println("Fetch rate: " + entry.getValue().metricValue());
}
}Key metrics to monitor:
records-lag-max: Maximum lag across all partitionsrecords-lag-avg: Average lag across all partitionsfetch-rate: Fetch requests per secondfetch-latency-avg: Average fetch latencyrecords-consumed-rate: Records consumed per secondcommit-latency-avg: Average commit latencySymptoms:
Causes:
Solutions:
// Increase max poll interval
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // 10 minutes
// Or process in smaller batches
Properties props = new Properties();
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // Smaller batches
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
System.err.println("Processing error: " + e.getMessage());
// Continue processing other records
}
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// Consumer rebalanced - offsets will be reset to last commit
System.err.println("Commit failed due to rebalance: " + e.getMessage());
// No action needed - next poll will trigger rejoin
}
}Prevention:
Symptoms:
Causes:
Solutions:
// Increase session timeout
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); // 60 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); // 5 seconds
// Use static membership (prevents rebalance on restart)
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
"consumer-" + InetAddress.getLocalHost().getHostName());
// Monitor rebalances
consumer.subscribe(Collections.singletonList("my-topic"),
new ConsumerRebalanceListener() {
private long rebalanceCount = 0;
private long lastRebalanceTime = System.currentTimeMillis();
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
rebalanceCount++;
long timeSinceLastRebalance = System.currentTimeMillis() - lastRebalanceTime;
System.out.println("Rebalance #" + rebalanceCount +
" after " + timeSinceLastRebalance + "ms");
lastRebalanceTime = System.currentTimeMillis();
if (timeSinceLastRebalance < 60000) {
System.err.println("WARNING: Frequent rebalancing detected");
}
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned partitions: " + partitions);
}
});Prevention:
Symptoms:
Causes:
Solutions:
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync();
} catch (OffsetOutOfRangeException e) {
// Determine recovery strategy
System.err.println("Offset out of range: " + e.getMessage());
// Option 1: Seek to beginning (may reprocess data)
consumer.seekToBeginning(consumer.assignment());
// Option 2: Seek to end (skips to latest, may lose data)
// consumer.seekToEnd(consumer.assignment());
// Option 3: Seek to specific timestamp
long timestamp = System.currentTimeMillis() - 3600000; // 1 hour ago
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
timestampsToSearch.put(partition, timestamp);
}
Map<TopicPartition, OffsetAndTimestamp> offsets =
consumer.offsetsForTimes(timestampsToSearch);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}
// Option 4: Seek to beginning offsets for each partition
Map<TopicPartition, Long> beginningOffsets =
consumer.beginningOffsets(consumer.assignment());
for (Map.Entry<TopicPartition, Long> entry : beginningOffsets.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue());
}
}
}Prevention:
Symptoms:
Causes:
Solutions:
// Monitor lag continuously
public class LagMonitor {
public void checkLag(Consumer<?, ?> consumer) {
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
if (name.name().equals("records-lag-max")) {
double lag = (double) entry.getValue().metricValue();
System.out.println("Current lag: " + lag + " records");
if (lag > 100000) {
System.err.println("CRITICAL: High consumer lag detected");
// Actions:
// 1. Add more consumer instances
// 2. Optimize processing logic
// 3. Increase partitions (requires topic recreation)
// 4. Use parallel processing within consumer
}
}
}
}
}
// Parallel processing pattern (same consumer instance, different threads)
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
Future<?> future = executor.submit(() -> {
try {
processRecord(record);
} catch (Exception e) {
System.err.println("Error processing: " + e.getMessage());
}
});
futures.add(future);
}
// Wait for all processing to complete
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
System.err.println("Processing thread error: " + e.getMessage());
}
}
// Commit after all records processed
consumer.commitSync();
}Prevention:
// Consumers handle null keys and values
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Key may be null
String key = record.key(); // May be null
if (key == null) {
System.out.println("Record with null key");
}
// Value may be null (tombstone in compacted topics)
String value = record.value(); // May be null
if (value == null) {
System.out.println("Tombstone record for key: " + key);
// Handle deletion
handleDeletion(key);
} else {
// Handle update
handleUpdate(key, value);
}
}// Poll may return empty results
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
// No records available
// This is normal and expected
// Don't assume error or end of stream
} else {
// Process records
System.out.println("Received " + records.count() + " records");
}
// Common mistake: Assuming non-empty means end of stream
// Correct: Keep polling in loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records (may be empty)
}// Consumers may receive duplicate records due to:
// 1. Rebalancing before commit
// 2. Consumer restart after crash
// 3. Network issues during commit
// Implement idempotent processing
Set<String> processedKeys = new HashSet<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String deduplicationKey = record.topic() + "-" +
record.partition() + "-" + record.offset();
if (processedKeys.contains(deduplicationKey)) {
System.out.println("Skipping duplicate: " + deduplicationKey);
continue;
}
processRecord(record);
processedKeys.add(deduplicationKey);
// Cleanup old entries periodically
if (processedKeys.size() > 100000) {
processedKeys.clear(); // Or use LRU cache
}
}
consumer.commitSync();
}
// Or use external deduplication store
// (database, Redis, etc.) for durability// Handle partition assignment/revocation properly
Map<TopicPartition, Long> partitionState = new HashMap<>();
consumer.subscribe(Collections.singletonList("my-topic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoking partitions: " + partitions);
// Save state before revocation
for (TopicPartition partition : partitions) {
Long state = partitionState.get(partition);
if (state != null) {
saveStateToExternalStore(partition, state);
}
partitionState.remove(partition);
}
// Commit offsets
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned partitions: " + partitions);
// Load state for new partitions
for (TopicPartition partition : partitions) {
Long state = loadStateFromExternalStore(partition);
if (state != null) {
partitionState.put(partition, state);
}
}
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions: " + partitions);
// Partitions lost without chance to commit
// Clean up state
for (TopicPartition partition : partitions) {
partitionState.remove(partition);
}
}
});// Rebalance can occur during record processing
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Long processing operation
processRecordSlowly(record);
// Rebalance may have occurred during processing
// Check if still assigned to this partition
Set<TopicPartition> assignment = consumer.assignment();
TopicPartition recordPartition =
new TopicPartition(record.topic(), record.partition());
if (!assignment.contains(recordPartition)) {
System.out.println("Partition " + recordPartition +
" no longer assigned, skipping commit");
break; // Exit processing loop
}
} catch (CommitFailedException e) {
// Rebalance happened, consumer no longer owns partition
System.err.println("Commit failed: " + e.getMessage());
break; // Exit and repoll to rejoin
}
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// Rebalanced during commit, will rejoin on next poll
System.err.println("Commit failed: " + e.getMessage());
}
}// Share consumer records have time-limited locks (default 30 seconds)
// If processing exceeds lock duration, record may be delivered to another consumer
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long processingStartTime = System.currentTimeMillis();
try {
processRecord(record);
long processingDuration = System.currentTimeMillis() - processingStartTime;
// Check if processing might have exceeded lock duration
if (processingDuration > 25000) { // 25 seconds threshold
System.err.println("WARNING: Processing took " + processingDuration +
"ms, approaching lock expiration");
}
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
long processingDuration = System.currentTimeMillis() - processingStartTime;
if (processingDuration > 30000) {
// Lock likely expired - record may have been delivered elsewhere
System.err.println("Lock may have expired after " +
processingDuration + "ms, rejecting to avoid duplicate processing");
shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
} else {
// Release for retry
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
}
}
}
}Prevention:
share.record.lock.duration.ms appropriately// Delivery count is Optional<Short> - max value 32,767
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Optional<Short> deliveryCount = record.deliveryCount();
if (deliveryCount.isPresent()) {
short count = deliveryCount.get();
// Check for unusually high delivery counts
if (count > 10) {
System.err.println("High delivery count: " + count + " for record: " +
record.key());
if (count > 100) {
// Likely poison pill message - send to DLQ
sendToDeadLetterQueue(record);
shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
continue;
}
}
}
try {
processRecord(record);
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
}
}
}Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Fetch more data per request
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10240); // 10KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms
// Process more records per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
// Larger partition fetch size
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2097152); // 2MB
// Async commits for better throughput
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Async commit doesn't block
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception.getMessage());
}
});
}Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "low-latency-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Minimize fetch wait time
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); // Fetch immediately
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10); // Max 10ms wait
// Small poll batches
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
// Frequent commits
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
if (!records.isEmpty()) {
consumer.commitSync(); // Synchronous for immediate persistence
}
}// heartbeat.interval.ms must be < session.timeout.ms / 3
// Typical configuration:
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 45 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 3 seconds (45s / 15)
// Invalid configuration (will throw ConfigException):
// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
// props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); // ERROR: > 10s / 3
// Relationship with max.poll.interval.ms:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes
// max.poll.interval.ms should be:
// - Greater than expected max processing time per poll
// - Typically 10-20x session.timeout.ms// With auto-commit enabled, commits happen:
// 1. On each poll() if auto.commit.interval.ms has elapsed
// 2. During close()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 5 seconds
// Auto-commit behavior:
consumer.subscribe(Collections.singletonList("my-topic"));
// First poll at T=0s
ConsumerRecords<String, String> records1 = consumer.poll(Duration.ofMillis(100));
// Offsets not committed yet
// Second poll at T=2s
ConsumerRecords<String, String> records2 = consumer.poll(Duration.ofMillis(100));
// Offsets not committed yet (< 5s interval)
// Third poll at T=6s
ConsumerRecords<String, String> records3 = consumer.poll(Duration.ofMillis(100));
// Offsets committed during this poll (>= 5s interval)
// Edge case: Records processed but not committed if crash before next poll
// Solution: Use manual commit for critical applications
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// DON'T DO THIS - Causes rebalancing
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecordVerySlowly(record); // Takes minutes per record
// max.poll.interval.ms exceeded → rebalance
}
consumer.commitSync();
}
// DO THIS - Process quickly or use async processing
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Option 1: Quick processing
for (ConsumerRecord<String, String> record : records) {
processRecordQuickly(record);
}
consumer.commitSync();
// Option 2: Async processing
for (ConsumerRecord<String, String> record : records) {
asyncProcessor.submit(() -> processRecordSlowly(record));
}
consumer.commitSync(); // Commit immediately after poll
}// DON'T DO THIS - Very inefficient
public void consumeMessage(String topic) {
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// ... process
consumer.close(); // Expensive
}
// DO THIS - Long-lived consumer
private final Consumer<String, String> consumer = createConsumer();
public void start() {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records
}
}// DON'T DO THIS - Loses uncommitted offsets
consumer.subscribe(Collections.singletonList("my-topic"));
// No rebalance listener
// DO THIS - Commit before rebalance
consumer.subscribe(Collections.singletonList("my-topic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // Commit before giving up partitions
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Initialize for new partitions
}
});// Optimal consumer group size:
// Number of consumers = Number of partitions
// Under-provisioned (2 consumers, 10 partitions):
// - Each consumer handles 5 partitions
// - Limited parallelism
// - High lag if consumers are slow
// Optimal (10 consumers, 10 partitions):
// - Each consumer handles 1 partition
// - Maximum parallelism
// - Lowest lag
// Over-provisioned (15 consumers, 10 partitions):
// - 10 consumers active
// - 5 consumers idle (no partitions assigned)
// - Wasted resources
// Calculate required consumers:
int partitionCount = getPartitionCount("my-topic");
int avgProcessingTimeMs = 100;
int desiredLagMs = 5000;
int requiredConsumers = (int) Math.ceil(
(double) (avgProcessingTimeMs * msgRatePerSecond) /
(1000.0 * partitionCount)
);
System.out.println("Recommended consumers: " +
Math.min(requiredConsumers, partitionCount));// Memory required per consumer:
// - max.poll.records * avg_record_size
// - fetch.max.bytes for buffering
// - Application processing buffers
// Example calculation:
int maxPollRecords = 500;
int avgRecordSize = 1024; // 1KB
int fetchMaxBytes = 52428800; // 50MB
long minMemoryRequired = (maxPollRecords * avgRecordSize) +
fetchMaxBytes +
(100 * 1024 * 1024); // 100MB for application
System.out.println("Minimum memory per consumer: " +
(minMemoryRequired / 1024 / 1024) + "MB");
// Configure JVM accordingly:
// -Xms256m -Xmx512m (for above example)Process records in parallel while maintaining per-partition ordering:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class ParallelProcessingConsumer {
private final Consumer<String, String> consumer;
private final ExecutorService executorService;
private final Map<TopicPartition, CompletableFuture<Void>> partitionFutures;
public ParallelProcessingConsumer(Properties props, int numThreads) {
this.consumer = new KafkaConsumer<>(props);
this.executorService = Executors.newFixedThreadPool(numThreads);
this.partitionFutures = new ConcurrentHashMap<>();
}
public void processWithParallelism() {
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process each partition in parallel
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
// Wait for previous processing of this partition to complete
CompletableFuture<Void> previousFuture = partitionFutures.get(partition);
if (previousFuture != null) {
try {
previousFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("Previous partition processing failed: " +
e.getMessage());
}
}
// Process partition records in parallel (maintains per-partition order)
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
for (ConsumerRecord<String, String> record : partitionRecords) {
try {
processRecord(record);
} catch (Exception e) {
System.err.println("Error processing record: " + e.getMessage());
}
}
}, executorService);
partitionFutures.put(partition, future);
}
// Wait for all partitions to finish processing
List<CompletableFuture<Void>> allFutures = new ArrayList<>(partitionFutures.values());
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]))
.join();
// Commit offsets after all processing completes
consumer.commitSync();
partitionFutures.clear();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Simulate processing
System.out.println(Thread.currentThread().getName() +
" processing: " + record.key());
}
public void close() {
executorService.shutdown();
consumer.close();
}
}Consumer that checkpoints state to external store for recovery:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class CheckpointingConsumer {
private final Consumer<String, String> consumer;
private final ExternalStateStore stateStore;
private final Map<TopicPartition, ProcessingState> partitionStates;
public CheckpointingConsumer(Properties props, ExternalStateStore stateStore) {
this.consumer = new KafkaConsumer<>(props);
this.stateStore = stateStore;
this.partitionStates = new ConcurrentHashMap<>();
}
public void processWithCheckpointing() {
consumer.subscribe(Arrays.asList("my-topic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Checkpoint all partition states before rebalance
for (TopicPartition partition : partitions) {
ProcessingState state = partitionStates.get(partition);
if (state != null) {
stateStore.save(partition, state);
partitionStates.remove(partition);
}
}
consumer.commitSync();
System.out.println("Checkpointed state for revoked partitions: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Restore state for newly assigned partitions
for (TopicPartition partition : partitions) {
ProcessingState state = stateStore.load(partition);
if (state != null) {
partitionStates.put(partition, state);
// Seek to last checkpointed offset
consumer.seek(partition, state.getLastOffset() + 1);
}
}
System.out.println("Restored state for assigned partitions: " + partitions);
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
TopicPartition partition = new TopicPartition(
record.topic(), record.partition());
// Get or create partition state
ProcessingState state = partitionStates.computeIfAbsent(
partition, k -> new ProcessingState());
// Process record with state
processWithState(record, state);
// Update state
state.setLastOffset(record.offset());
state.incrementProcessedCount();
}
// Periodically checkpoint (every 1000 records)
for (Map.Entry<TopicPartition, ProcessingState> entry : partitionStates.entrySet()) {
ProcessingState state = entry.getValue();
if (state.getProcessedCount() >= 1000) {
stateStore.save(entry.getKey(), state);
state.resetProcessedCount();
}
}
consumer.commitAsync();
}
}
private void processWithState(ConsumerRecord<String, String> record,
ProcessingState state) {
// Use state for processing
System.out.println("Processing with state: " + state.getCustomData());
state.setCustomData("Last processed: " + record.key());
}
// State class
private static class ProcessingState {
private long lastOffset;
private long processedCount;
private String customData;
public long getLastOffset() { return lastOffset; }
public void setLastOffset(long offset) { this.lastOffset = offset; }
public long getProcessedCount() { return processedCount; }
public void incrementProcessedCount() { this.processedCount++; }
public void resetProcessedCount() { this.processedCount = 0; }
public String getCustomData() { return customData; }
public void setCustomData(String data) { this.customData = data; }
}
// External state store interface
public interface ExternalStateStore {
void save(TopicPartition partition, ProcessingState state);
ProcessingState load(TopicPartition partition);
}
}Consumer with configurable rate limiting:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class RateLimitedConsumer {
private final Consumer<String, String> consumer;
private final int maxRecordsPerSecond;
private final AtomicLong lastProcessTime;
private final AtomicLong processedInWindow;
private final long windowSizeMs = 1000; // 1 second window
public RateLimitedConsumer(Properties props, int maxRecordsPerSecond) {
this.consumer = new KafkaConsumer<>(props);
this.maxRecordsPerSecond = maxRecordsPerSecond;
this.lastProcessTime = new AtomicLong(System.currentTimeMillis());
this.processedInWindow = new AtomicLong(0);
}
public void processWithRateLimit() {
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Check rate limit
if (!allowProcess()) {
// Pause consumption until rate limit allows
long sleepTime = calculateSleepTime();
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
// Process record
processRecord(record);
// Update rate limiting counters
long now = System.currentTimeMillis();
long lastTime = lastProcessTime.get();
if (now - lastTime >= windowSizeMs) {
// New window
lastProcessTime.set(now);
processedInWindow.set(1);
} else {
processedInWindow.incrementAndGet();
}
}
consumer.commitAsync();
}
}
private boolean allowProcess() {
long now = System.currentTimeMillis();
long lastTime = lastProcessTime.get();
if (now - lastTime >= windowSizeMs) {
return true; // New window started
}
return processedInWindow.get() < maxRecordsPerSecond;
}
private long calculateSleepTime() {
long now = System.currentTimeMillis();
long lastTime = lastProcessTime.get();
long elapsed = now - lastTime;
if (elapsed >= windowSizeMs) {
return 0;
}
long remainingTime = windowSizeMs - elapsed;
long processedCount = processedInWindow.get();
if (processedCount >= maxRecordsPerSecond) {
return remainingTime;
}
return 0;
}
private void processRecord(ConsumerRecord<String, String> record) {
System.out.println("Processing: " + record.key());
}
public Map<String, Object> getRateLimitStats() {
Map<String, Object> stats = new HashMap<>();
stats.put("maxRecordsPerSecond", maxRecordsPerSecond);
stats.put("processedInCurrentWindow", processedInWindow.get());
stats.put("utilizationPercent",
(processedInWindow.get() * 100.0) / maxRecordsPerSecond);
return stats;
}
public void close() {
consumer.close();
}
}Consumer that monitors and alerts on high lag:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class LagMonitoringConsumer {
private final Consumer<String, String> consumer;
private final ScheduledExecutorService scheduler;
private final long lagThreshold;
private final Map<TopicPartition, Long> lastCommittedOffsets;
public LagMonitoringConsumer(Properties props, long lagThreshold) {
this.consumer = new KafkaConsumer<>(props);
this.lagThreshold = lagThreshold;
this.lastCommittedOffsets = new ConcurrentHashMap<>();
this.scheduler = Executors.newSingleThreadScheduledExecutor();
// Monitor lag every 30 seconds
scheduler.scheduleAtFixedRate(this::checkLag, 30, 30, TimeUnit.SECONDS);
}
public void process() {
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Commit and track offsets
Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
commitOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
lastCommittedOffsets.put(partition, lastOffset + 1);
}
if (!commitOffsets.isEmpty()) {
consumer.commitAsync(commitOffsets, null);
}
}
}
private void checkLag() {
Set<TopicPartition> assignment = consumer.assignment();
if (assignment.isEmpty()) {
return;
}
// Get end offsets
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
System.out.println("\n=== Consumer Lag Report ===");
boolean hasHighLag = false;
long totalLag = 0;
for (TopicPartition partition : assignment) {
Long committedOffset = lastCommittedOffsets.get(partition);
if (committedOffset == null) {
// Get committed offset from Kafka
OffsetAndMetadata offsetMetadata =
consumer.committed(Collections.singleton(partition))
.get(partition);
committedOffset = offsetMetadata != null ? offsetMetadata.offset() : 0L;
}
Long endOffset = endOffsets.get(partition);
long lag = endOffset - committedOffset;
totalLag += lag;
String status = lag > lagThreshold ? "⚠️ HIGH" : "✅ OK";
System.out.println(String.format("Partition %d: lag=%d %s",
partition.partition(), lag, status));
if (lag > lagThreshold) {
hasHighLag = true;
}
}
System.out.println(String.format("Total lag: %d records", totalLag));
System.out.println("===========================\n");
if (hasHighLag) {
System.err.println("⚠️ WARNING: High lag detected!");
// In production: Send alert to monitoring system
}
}
private void processRecord(ConsumerRecord<String, String> record) {
System.out.println("Processing: " + record.key());
}
public Map<String, Long> getCurrentLag() {
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
Map<String, Long> lagByPartition = new HashMap<>();
for (TopicPartition partition : assignment) {
Long committedOffset = lastCommittedOffsets.get(partition);
if (committedOffset == null) {
continue;
}
Long endOffset = endOffsets.get(partition);
long lag = endOffset - committedOffset;
lagByPartition.put(partition.toString(), lag);
}
return lagByPartition;
}
public void close() {
scheduler.shutdown();
consumer.close();
}
}Single consumer that fans out to multiple processing pipelines:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;
public class FanOutConsumer {
private final Consumer<String, String> consumer;
private final List<ProcessingPipeline> pipelines;
private final ExecutorService executor;
public FanOutConsumer(Properties props) {
this.consumer = new KafkaConsumer<>(props);
this.pipelines = new ArrayList<>();
this.executor = Executors.newCachedThreadPool();
}
public void addPipeline(String name, Predicate<ConsumerRecord<String, String>> filter,
RecordProcessor processor) {
pipelines.add(new ProcessingPipeline(name, filter, processor));
}
public void process() {
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
// Fan out to all matching pipelines
for (ProcessingPipeline pipeline : pipelines) {
if (pipeline.filter.test(record)) {
Future<?> future = executor.submit(() -> {
try {
pipeline.processor.process(record);
} catch (Exception e) {
System.err.println("Pipeline " + pipeline.name +
" error: " + e.getMessage());
}
});
futures.add(future);
}
}
}
// Wait for all processing to complete
for (Future<?> future : futures) {
try {
future.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("Processing timeout: " + e.getMessage());
}
}
// Commit after all pipelines complete
consumer.commitAsync();
}
}
private static class ProcessingPipeline {
final String name;
final Predicate<ConsumerRecord<String, String>> filter;
final RecordProcessor processor;
ProcessingPipeline(String name,
Predicate<ConsumerRecord<String, String>> filter,
RecordProcessor processor) {
this.name = name;
this.filter = filter;
this.processor = processor;
}
}
@FunctionalInterface
public interface RecordProcessor {
void process(ConsumerRecord<String, String> record);
}
public void close() {
executor.shutdown();
consumer.close();
}
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
FanOutConsumer consumer = new FanOutConsumer(props);
// Add processing pipelines
consumer.addPipeline("HighPriority",
record -> record.value().startsWith("HIGH:"),
record -> System.out.println("High priority: " + record.value()));
consumer.addPipeline("Analytics",
record -> record.value().contains("analytics"),
record -> System.out.println("Analytics: " + record.value()));
consumer.addPipeline("Logging",
record -> true, // Log everything
record -> System.out.println("Log: " + record.value()));
consumer.process();
}
}Comprehensive monitoring for consumer health:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import java.util.concurrent.*;
public class ConsumerMetricsDashboard {
private final Consumer<String, String> consumer;
private final ScheduledExecutorService scheduler;
public ConsumerMetricsDashboard(Consumer<String, String> consumer) {
this.consumer = consumer;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
// Print dashboard every 30 seconds
scheduler.scheduleAtFixedRate(this::printDashboard, 30, 30, TimeUnit.SECONDS);
}
public void printDashboard() {
System.out.println("\n=== Consumer Metrics Dashboard ===");
System.out.println("Timestamp: " + new Date());
System.out.println();
// Get all metrics
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Consumption Rate
System.out.println("CONSUMPTION:");
printMetric(metrics, "records-consumed-rate", "records/sec");
printMetric(metrics, "bytes-consumed-rate", "bytes/sec");
printMetric(metrics, "fetch-rate", "fetches/sec");
System.out.println();
// Lag
System.out.println("LAG:");
printMetric(metrics, "records-lag-max", "records");
printMetric(metrics, "records-lag-avg", "records");
// Calculate lag percentage
Set<TopicPartition> assignment = consumer.assignment();
if (!assignment.isEmpty()) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
long totalLag = 0;
long totalEndOffset = 0;
for (TopicPartition partition : assignment) {
long position = consumer.position(partition);
long endOffset = endOffsets.get(partition);
totalLag += (endOffset - position);
totalEndOffset += endOffset;
}
double lagPercent = totalEndOffset > 0 ?
(totalLag * 100.0 / totalEndOffset) : 0;
System.out.println(String.format(" Lag Percentage: %.2f%%", lagPercent));
}
System.out.println();
// Latency
System.out.println("LATENCY:");
printMetric(metrics, "fetch-latency-avg", "ms");
printMetric(metrics, "fetch-latency-max", "ms");
printMetric(metrics, "commit-latency-avg", "ms");
System.out.println();
// Partition Assignment
System.out.println("ASSIGNMENT:");
System.out.println(" Assigned Partitions: " + assignment.size());
for (TopicPartition partition : assignment) {
long position = consumer.position(partition);
System.out.println(String.format(" %s: position=%d",
partition, position));
}
System.out.println();
// Rebalancing
System.out.println("REBALANCING:");
printMetric(metrics, "rebalance-total", "count");
printMetric(metrics, "rebalance-latency-avg", "ms");
System.out.println();
// Health Alerts
System.out.println("HEALTH ALERTS:");
checkHealthAlerts(metrics);
System.out.println("===================================\n");
}
private void printMetric(Map<MetricName, ? extends Metric> metrics,
String metricName, String unit) {
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (entry.getKey().name().equals(metricName)) {
Object value = entry.getValue().metricValue();
if (value instanceof Number) {
System.out.println(String.format(" %s: %.2f %s",
metricName, ((Number) value).doubleValue(), unit));
}
return;
}
}
}
private void checkHealthAlerts(Map<MetricName, ? extends Metric> metrics) {
boolean healthy = true;
// Check lag
double maxLag = getMetricValue(metrics, "records-lag-max");
if (maxLag > 10000) {
System.out.println(" ⚠️ HIGH LAG: " + maxLag + " records");
healthy = false;
}
// Check fetch latency
double fetchLatency = getMetricValue(metrics, "fetch-latency-avg");
if (fetchLatency > 1000) {
System.out.println(" ⚠️ HIGH FETCH LATENCY: " + fetchLatency + " ms");
healthy = false;
}
// Check commit failures
double commitRate = getMetricValue(metrics, "commit-rate");
if (commitRate == 0) {
System.out.println(" ⚠️ NO COMMITS: Check auto-commit or manual commit logic");
healthy = false;
}
if (healthy) {
System.out.println(" ✅ All metrics healthy");
}
}
private double getMetricValue(Map<MetricName, ? extends Metric> metrics,
String metricName) {
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (entry.getKey().name().equals(metricName)) {
Object value = entry.getValue().metricValue();
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
}
}
return 0.0;
}
public void close() {
scheduler.shutdown();
}
}