or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

clients

admin.mdconsumer.mdproducer.md
index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

consumer.mddocs/clients/

Consumer API

The Consumer API subscribes to topics and processes streams of records. Consumers are not thread-safe and should be used by a single thread.

Core Interfaces

Consumer<K, V>

Main consumer interface for reading records from Kafka.

package org.apache.kafka.clients.consumer;

public interface Consumer<K, V> extends Closeable {
    // Subscription
    void subscribe(Collection<String> topics);
    void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
    void subscribe(Pattern pattern);
    void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
    void subscribe(SubscriptionPattern pattern);
    void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);
    void assign(Collection<TopicPartition> partitions);
    void unsubscribe();

    // Polling
    ConsumerRecords<K, V> poll(Duration timeout);

    // Offset management - Synchronous
    void commitSync();
    void commitSync(Duration timeout);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout);

    // Offset management - Asynchronous
    void commitAsync();
    void commitAsync(OffsetCommitCallback callback);
    void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
                     OffsetCommitCallback callback);

    // Position management
    void seek(TopicPartition partition, long offset);
    void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
    void seekToBeginning(Collection<TopicPartition> partitions);
    void seekToEnd(Collection<TopicPartition> partitions);
    long position(TopicPartition partition);
    long position(TopicPartition partition, Duration timeout);
    Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
    Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions,
                                                      Duration timeout);

    // Flow control
    void pause(Collection<TopicPartition> partitions);
    void resume(Collection<TopicPartition> partitions);
    Set<TopicPartition> paused();

    // Metadata
    Set<TopicPartition> assignment();
    Set<String> subscription();
    List<PartitionInfo> partitionsFor(String topic);
    List<PartitionInfo> partitionsFor(String topic, Duration timeout);
    Map<String, List<PartitionInfo>> listTopics();
    Map<String, List<PartitionInfo>> listTopics(Duration timeout);
    Map<MetricName, ? extends Metric> metrics();
    Uuid clientInstanceId(Duration timeout);

    // Metric subscription (for monitoring)
    void registerMetricForSubscription(KafkaMetric metric);
    void unregisterMetricFromSubscription(KafkaMetric metric);

    // Offset queries
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
        Map<TopicPartition, Long> timestampsToSearch);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
        Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
                                                Duration timeout);
    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
                                          Duration timeout);
    OptionalLong currentLag(TopicPartition topicPartition);

    // Group management
    ConsumerGroupMetadata groupMetadata();
    void enforceRebalance();
    void enforceRebalance(String reason);

    // Lifecycle
    void close();
    void close(Duration timeout);
    void close(CloseOptions options);
    void wakeup();
}

KafkaConsumer<K, V>

Thread-unsafe implementation of the Consumer interface. Each consumer instance should be used by only one thread.

package org.apache.kafka.clients.consumer;

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    // Constructor with Properties
    public KafkaConsumer(Properties properties);
    public KafkaConsumer(Properties properties,
                        Deserializer<K> keyDeserializer,
                        Deserializer<V> valueDeserializer);

    // Constructor with Map
    public KafkaConsumer(Map<String, Object> configs);
    public KafkaConsumer(Map<String, Object> configs,
                        Deserializer<K> keyDeserializer,
                        Deserializer<V> valueDeserializer);

    // All Consumer interface methods implemented
}

ShareConsumer<K, V>

Share consumer interface for cooperative consumption using share groups. Share groups allow multiple consumers to cooperatively consume from the same partitions, with records distributed across consumers rather than partitions. This is a preview feature (@InterfaceStability.Evolving).

Key Differences from Consumer Groups:

  • Multiple consumers can consume from the same partitions simultaneously
  • Records are acquired with time-limited locks (default 30 seconds)
  • Records can be explicitly acknowledged (ACCEPT, RELEASE, REJECT)
  • No partition assignment or rebalancing
  • Delivery count tracking for implementing dead-letter patterns
package org.apache.kafka.clients.consumer;

public interface ShareConsumer<K, V> extends Closeable {
    // Subscription
    Set<String> subscription();
    void subscribe(Collection<String> topics);
    void unsubscribe();

    // Polling
    ConsumerRecords<K, V> poll(Duration timeout);

    // Acknowledgement (explicit mode only)
    void acknowledge(ConsumerRecord<K, V> record);
    void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

    // Commit acknowledgements
    Map<TopicIdPartition, Optional<KafkaException>> commitSync();
    Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);
    void commitAsync();
    void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback);

    // Metrics and monitoring
    Uuid clientInstanceId(Duration timeout);
    Map<MetricName, ? extends Metric> metrics();
    void registerMetricForSubscription(KafkaMetric metric);
    void unregisterMetricFromSubscription(KafkaMetric metric);

    // Lifecycle
    void close();
    void close(Duration timeout);
    void wakeup();
}

KafkaShareConsumer<K, V>

Implementation of ShareConsumer interface.

package org.apache.kafka.clients.consumer;

public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
    public KafkaShareConsumer(Map<String, Object> configs);
    public KafkaShareConsumer(Map<String, Object> configs,
                             Deserializer<K> keyDeserializer,
                             Deserializer<V> valueDeserializer);
    public KafkaShareConsumer(Properties properties);
    public KafkaShareConsumer(Properties properties,
                             Deserializer<K> keyDeserializer,
                             Deserializer<V> valueDeserializer);
}

Data Classes

ConsumerRecord<K, V>

A record received from Kafka.

package org.apache.kafka.clients.consumer;

public class ConsumerRecord<K, V> {
    // Constants
    public static final long NO_TIMESTAMP = -1L;
    public static final int NULL_SIZE = -1;

    // Accessors
    public String topic();
    public int partition();
    public long offset();
    public long timestamp();
    public TimestampType timestampType();
    public int serializedKeySize();
    public int serializedValueSize();
    public K key();
    public V value();
    public Headers headers();
    public Optional<Integer> leaderEpoch();
    public Optional<Integer> deliveryCount();
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
        record.topic(),
        record.partition(),
        record.offset(),
        record.key(),
        record.value());

    // Access timestamp
    if (record.timestamp() != ConsumerRecord.NO_TIMESTAMP) {
        System.out.println("Timestamp: " + record.timestamp());
        System.out.println("Timestamp type: " + record.timestampType());
    }

    // Access headers
    for (Header header : record.headers()) {
        System.out.println("Header: " + header.key() + " = " +
            new String(header.value()));
    }

    // Check delivery count (for share consumers)
    record.deliveryCount().ifPresent(count ->
        System.out.println("Delivery count: " + count));
}

ConsumerRecords<K, V>

Collection of records from a poll operation.

package org.apache.kafka.clients.consumer;

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
    // Empty constant
    public static final ConsumerRecords<Object, Object> EMPTY;

    // Access records
    public List<ConsumerRecord<K, V>> records(TopicPartition partition);
    public Iterable<ConsumerRecord<K, V>> records(String topic);
    public Set<TopicPartition> partitions();
    public Iterator<ConsumerRecord<K, V>> iterator();

    // Metadata
    public int count();
    public boolean isEmpty();
    public Map<TopicPartition, Long> nextOffsets();
}

Usage Example:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

// Process all records
for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
}

// Process by partition
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    System.out.println("Partition " + partition + " has " +
        partitionRecords.size() + " records");
    for (ConsumerRecord<String, String> record : partitionRecords) {
        processRecord(record);
    }
}

// Process by topic
for (ConsumerRecord<String, String> record : records.records("my-topic")) {
    processRecord(record);
}

// Check if empty
if (records.isEmpty()) {
    System.out.println("No records received");
} else {
    System.out.println("Received " + records.count() + " records");
}

OffsetAndMetadata

Offset position with optional metadata.

package org.apache.kafka.clients.consumer;

public class OffsetAndMetadata implements Serializable {
    // Constructors
    public OffsetAndMetadata(long offset);
    public OffsetAndMetadata(long offset, String metadata);
    public OffsetAndMetadata(long offset, Optional<Integer> leaderEpoch, String metadata);

    // Accessors
    public long offset();
    public String metadata();
    public Optional<Integer> leaderEpoch();
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

// Manual offset commit with metadata
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
TopicPartition partition = new TopicPartition("my-topic", 0);
offsets.put(partition, new OffsetAndMetadata(100L, "Processed batch 1"));

consumer.commitSync(offsets);

// Retrieve committed offsets
Map<TopicPartition, OffsetAndMetadata> committed =
    consumer.committed(Collections.singleton(partition));
OffsetAndMetadata offsetMeta = committed.get(partition);
System.out.println("Committed offset: " + offsetMeta.offset());
System.out.println("Metadata: " + offsetMeta.metadata());

OffsetAndTimestamp

Offset and timestamp pair for time-based queries.

package org.apache.kafka.clients.consumer;

public class OffsetAndTimestamp {
    public long offset();
    public long timestamp();
    public Optional<Integer> leaderEpoch();
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

// Find offsets for specific timestamps
TopicPartition partition = new TopicPartition("my-topic", 0);
long timestamp = System.currentTimeMillis() - 3600000; // 1 hour ago

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(partition, timestamp);

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes =
    consumer.offsetsForTimes(timestampsToSearch);

OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(partition);
if (offsetAndTimestamp != null) {
    System.out.println("Offset at timestamp " + timestamp + ": " +
        offsetAndTimestamp.offset());
    consumer.seek(partition, offsetAndTimestamp.offset());
}

ConsumerGroupMetadata

Consumer group metadata for transactional sends.

package org.apache.kafka.clients.consumer;

public class ConsumerGroupMetadata {
    public String groupId();
    public int generationId();
    public String memberId();
    public Optional<String> groupInstanceId();
}

Usage Example:

// Get consumer group metadata for transactional producer
ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();

// Send to transactional producer
producer.sendOffsetsToTransaction(offsets, groupMetadata);

Callback Interfaces

ConsumerRebalanceListener

Callback for partition rebalance events.

package org.apache.kafka.clients.consumer;

public interface ConsumerRebalanceListener {
    /**
     * Called before partitions are revoked from this consumer.
     */
    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    /**
     * Called after partitions are assigned to this consumer.
     */
    void onPartitionsAssigned(Collection<TopicPartition> partitions);

    /**
     * Called when partitions are lost (optional, default no-op).
     */
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

Consumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Partitions revoked: " + partitions);
        // Commit offsets before rebalance
        consumer.commitSync();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Partitions assigned: " + partitions);
        // Initialize state for new partitions
        for (TopicPartition partition : partitions) {
            // Optionally seek to specific offset
            // consumer.seek(partition, 0L);
        }
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        System.out.println("Partitions lost: " + partitions);
        // Partitions lost due to consumer group rebalance failure
        // No commit is possible
    }
});

OffsetCommitCallback

Callback for asynchronous offset commits.

package org.apache.kafka.clients.consumer;

@FunctionalInterface
public interface OffsetCommitCallback {
    /**
     * Called when offset commit completes.
     * @param offsets The committed offsets
     * @param exception The exception, or null if successful
     */
    void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception);
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import java.util.Map;

// Async commit with callback
consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                          Exception exception) {
        if (exception != null) {
            System.err.println("Commit failed: " + exception.getMessage());
        } else {
            System.out.println("Commit succeeded for offsets: " + offsets);
        }
    }
});

// Lambda syntax
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed: " + exception.getMessage());
    }
});

AcknowledgementCommitCallback

Callback for asynchronous acknowledgement commits in share consumers.

package org.apache.kafka.clients.consumer;

@FunctionalInterface
public interface AcknowledgementCommitCallback {
    /**
     * Called when acknowledgement commit completes.
     * @param offsets Map of topic-partition to acknowledged record offsets
     * @param exception The exception, or null if successful
     */
    void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception);
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicIdPartition;
import java.util.*;

// Set acknowledgement commit callback for share consumer
shareConsumer.setAcknowledgementCommitCallback(
    new AcknowledgementCommitCallback() {
        @Override
        public void onComplete(Map<TopicIdPartition, Set<Long>> offsets,
                              Exception exception) {
            if (exception != null) {
                System.err.println("Acknowledgement commit failed: " +
                                   exception.getMessage());
            } else {
                System.out.println("Acknowledgements committed for " +
                                   offsets.size() + " partitions");
            }
        }
    });

// Lambda syntax
shareConsumer.setAcknowledgementCommitCallback((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Acknowledgement commit failed: " + exception.getMessage());
    }
});

Exception Types:

  • AuthorizationException - Not authorized to the topic or share group
  • InvalidRecordStateException - Record state is invalid for acknowledgement
  • NotLeaderOrFollowerException - Leader changed before acknowledgements were sent
  • DisconnectException - Broker disconnected before request completed
  • WakeupException - Consumer wakeup() called during callback
  • InterruptException - Thread interrupted during callback
  • KafkaException - Other unrecoverable errors

ConsumerInterceptor<K, V>

Intercept records before processing.

package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Configurable;

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    /**
     * Called before records are returned to the application.
     */
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

    /**
     * Called when offsets are committed.
     */
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    void close();
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import java.util.*;

public class LoggingInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("Received " + records.count() + " records");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("Committed offsets: " + offsets);
    }

    @Override
    public void close() {}
}

// Configure interceptor
Properties props = new Properties();
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
    LoggingInterceptor.class.getName());

Partition Assignment

ConsumerPartitionAssignor

Custom partition assignment strategy.

package org.apache.kafka.clients.consumer;

public interface ConsumerPartitionAssignor {
    String name();
    short version();
    List<RebalanceProtocol> supportedProtocols();

    ByteBuffer subscriptionUserData(Set<String> topics);

    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);

    void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata);

    // Nested classes
    class Subscription {
        public List<String> topics();
        public ByteBuffer userData();
        public List<TopicPartition> ownedPartitions();
        public Optional<String> groupInstanceId();
        public Optional<String> rackId();
    }

    class Assignment {
        public List<TopicPartition> partitions();
        public ByteBuffer userData();
    }

    class GroupSubscription {
        public Map<String, Subscription> groupSubscription();
    }

    class GroupAssignment {
        public Map<String, Assignment> groupAssignment();
    }

    enum RebalanceProtocol {
        EAGER,
        COOPERATIVE
    }
}

Built-in Assignors

RangeAssignor:

package org.apache.kafka.clients.consumer;

public class RangeAssignor implements ConsumerPartitionAssignor {
    // Range-based assignment
    // Name: "range"
    // Assigns partitions on a per-topic basis
}

RoundRobinAssignor:

package org.apache.kafka.clients.consumer;

public class RoundRobinAssignor implements ConsumerPartitionAssignor {
    // Round-robin assignment
    // Name: "roundrobin"
    // Assigns partitions evenly across consumers
}

StickyAssignor:

package org.apache.kafka.clients.consumer;

public class StickyAssignor implements ConsumerPartitionAssignor {
    // Sticky assignment
    // Name: "sticky"
    // Minimizes partition movement during rebalance
}

CooperativeStickyAssignor:

package org.apache.kafka.clients.consumer;

public class CooperativeStickyAssignor implements ConsumerPartitionAssignor {
    // Cooperative sticky assignment
    // Name: "cooperative-sticky"
    // Allows incremental rebalancing without stopping consumption
}

Usage:

Properties props = new Properties();
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

// Or multiple strategies
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    Arrays.asList(
        "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
        "org.apache.kafka.clients.consumer.StickyAssignor"
    ));

Configuration

ConsumerConfig

Configuration constants for the consumer.

package org.apache.kafka.clients.consumer;

public class ConsumerConfig extends AbstractConfig {
    // Required configurations
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";

    // Group configuration
    public static final String GROUP_ID_CONFIG = "group.id";
    public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
    public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";

    // Offset configuration
    public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
    public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";

    // Assignment strategy
    public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG =
        "partition.assignment.strategy";

    // Fetch configuration
    public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
    public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
    public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
    public static final String MAX_PARTITION_FETCH_BYTES_CONFIG =
        "max.partition.fetch.bytes";

    // Timeouts
    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
    public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";

    // Metadata
    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
    public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG =
        "allow.auto.create.topics";

    // Isolation and reliability
    public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
    public static final String CHECK_CRCS_CONFIG = "check.crcs";
    public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";

    // Client identification
    public static final String CLIENT_ID_CONFIG = "client.id";
    public static final String CLIENT_RACK_CONFIG = "client.rack";

    // Interceptors
    public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
}

ShareConsumerConfig

Configuration for share consumers. Extends ConsumerConfig but excludes configurations that are not supported for share consumers.

package org.apache.kafka.clients.consumer;

public class ShareConsumerConfig extends ConsumerConfig {
    // Share-specific configuration
    public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG =
        "share.acknowledgement.mode";
    // Values: "implicit" (default), "explicit"

    // Inherited from ConsumerConfig but unsupported (will throw exception):
    // - auto.offset.reset
    // - enable.auto.commit
    // - group.instance.id
    // - isolation.level
    // - partition.assignment.strategy
    // - interceptor.classes
    // - session.timeout.ms
    // - heartbeat.interval.ms
    // - group.protocol
    // - group.remote.assignor
}

Configuration Properties:

PropertyTypeDefaultDescription
share.acknowledgement.modeStringimplicitAcknowledgement mode: implicit (auto-acknowledge on next poll) or explicit (must call acknowledge())

Broker-Side Share Group Configuration:

These are configured at the broker level:

PropertyTypeDefaultDescription
share.session.timeout.msint45000 (45 sec)Share group session timeout
share.heartbeat.interval.msint3000 (3 sec)Share group heartbeat interval
share.record.lock.duration.msint30000 (30 sec)Duration records are locked when delivered
share.auto.offset.resetStringlatestInitial offset strategy: earliest, latest, by_duration:<ISO8601>
share.isolation.levelStringread_uncommittedTransaction isolation: read_committed or read_uncommitted
group.share.record.lock.partition.limitint200Max acquired records per partition per consumer

Configuration Examples

Basic Configuration:

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

Manual Offset Management:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Disable auto-commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Consumer<String, String> consumer = new KafkaConsumer<>(props);

Read Committed (Transactional):

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Read only committed messages
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

Configuration Defaults:

  • group.protocol: "classic"
  • session.timeout.ms: 45000 (45 seconds)
  • heartbeat.interval.ms: 3000 (3 seconds)
  • max.poll.interval.ms: 300000 (5 minutes)
  • max.poll.records: 500
  • enable.auto.commit: true
  • auto.commit.interval.ms: 5000 (5 seconds)
  • auto.offset.reset: "latest" (or "earliest", "by_duration:<duration>", "none")
  • partition.assignment.strategy: [RangeAssignor, CooperativeStickyAssignor]
  • fetch.min.bytes: 1
  • fetch.max.bytes: 52428800 (50MB)
  • fetch.max.wait.ms: 500
  • max.partition.fetch.bytes: 1048576 (1MB)
  • request.timeout.ms: 30000 (30 seconds)
  • default.api.timeout.ms: 60000 (1 minute)
  • metadata.max.age.ms: 300000 (5 minutes)
  • allow.auto.create.topics: true
  • isolation.level: "read_uncommitted"
  • check.crcs: true
  • exclude.internal.topics: true

Enums

GroupProtocol

package org.apache.kafka.clients.consumer;

public enum GroupProtocol {
    CLASSIC,  // Traditional consumer group protocol
    CONSUMER  // New consumer group protocol (KIP-848)
}

AcknowledgeType

package org.apache.kafka.clients.consumer;

public enum AcknowledgeType {
    ACCEPT(1),    // Successfully processed
    RELEASE(2),   // Requeue for later delivery
    REJECT(3);    // Permanently reject

    public byte id();
}

Options

CloseOptions

Options for closing the consumer.

package org.apache.kafka.clients.consumer;

public class CloseOptions {
    public enum GroupMembershipOperation {
        LEAVE_GROUP,      // Leave consumer group
        REMAIN_IN_GROUP,  // Stay in group
        DEFAULT           // Default behavior
    }

    // Factory methods
    public static CloseOptions timeout(Duration timeout);
    public static CloseOptions groupMembershipOperation(GroupMembershipOperation operation);

    // Fluent setters
    public CloseOptions withTimeout(Duration timeout);
    public CloseOptions withGroupMembershipOperation(GroupMembershipOperation operation);
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;

// Close with timeout
consumer.close(CloseOptions.timeout(Duration.ofSeconds(10)));

// Close but remain in group
consumer.close(CloseOptions.groupMembershipOperation(
    CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));

// Combine options
CloseOptions options = CloseOptions.timeout(Duration.ofSeconds(10))
    .withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
consumer.close(options);

SubscriptionPattern

RE2/J compatible pattern for topic subscription.

package org.apache.kafka.clients.consumer;

public class SubscriptionPattern {
    public SubscriptionPattern(String pattern);
}

Mock Consumers

MockConsumer<K, V>

Mock consumer for testing.

package org.apache.kafka.clients.consumer;

public class MockConsumer<K, V> implements Consumer<K, V> {
    public MockConsumer(OffsetResetStrategy offsetResetStrategy);

    // Test helpers
    public void addRecord(ConsumerRecord<K, V> record);
    public void addEndOffsets(Map<TopicPartition, Long> newOffsets);
    public void addBeginningOffsets(Map<TopicPartition, Long> newOffsets);
    public void setException(KafkaException exception);
    public void updatePartitions(String topic, List<PartitionInfo> partitions);
    public void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets);
    public void updateEndOffsets(Map<TopicPartition, Long> newOffsets);
    public void rebalance(Collection<TopicPartition> newAssignment);

    // Verification
    public boolean closed();
    public Map<String, List<PartitionInfo>> partitions();
    public Set<TopicPartition> paused();
}

Usage Example:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

// Create mock consumer
MockConsumer<String, String> mockConsumer =
    new MockConsumer<>(OffsetResetStrategy.EARLIEST);

// Configure partitions
TopicPartition partition = new TopicPartition("test-topic", 0);
mockConsumer.assign(Collections.singletonList(partition));

// Add beginning offsets
Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(partition, 0L);
mockConsumer.updateBeginningOffsets(beginningOffsets);

// Add test records
mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 0L, "key1", "value1"));
mockConsumer.addRecord(new ConsumerRecord<>("test-topic", 0, 1L, "key2", "value2"));

// Test consumer code
ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(100));
assertEquals(2, records.count());

MockShareConsumer<K, V>

Mock share consumer for testing.

package org.apache.kafka.clients.consumer;

public class MockShareConsumer<K, V> implements ShareConsumer<K, V> {
    // Similar test helpers as MockConsumer
}

Exceptions

CommitFailedException

Unrecoverable commit failure.

package org.apache.kafka.clients.consumer;

public class CommitFailedException extends ApiException {
    // Consumer group rebalanced before commit completed
    // Consumer is no longer part of the group
}

RetriableCommitFailedException

Retriable commit failure.

package org.apache.kafka.clients.consumer;

public class RetriableCommitFailedException extends RetriableException {
    // Commit failed but can be retried
}

InvalidOffsetException

Invalid offset.

package org.apache.kafka.clients.consumer;

public class InvalidOffsetException extends ApiException {
    // Offset is invalid for the partition
}

NoOffsetForPartitionException

No offset exists for the partition.

package org.apache.kafka.clients.consumer;

public class NoOffsetForPartitionException extends InvalidOffsetException {
    // No committed offset and auto.offset.reset=none
}

OffsetOutOfRangeException

Offset out of range.

package org.apache.kafka.clients.consumer;

public class OffsetOutOfRangeException extends RetriableException {
    // Requested offset is out of range for the partition
    // Check beginning/end offsets and seek accordingly
}

LogTruncationException

Log truncation detected.

package org.apache.kafka.clients.consumer;

public class LogTruncationException extends ApiException {
    // Log was truncated on the broker
    // Data loss detected
}

Usage Patterns

Basic Consumer Loop

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                record.offset(), record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

Manual Offset Management

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Processing: offset = %d, key = %s, value = %s%n",
                record.offset(), record.key(), record.value());
            // Process record
        }

        // Commit offsets synchronously after processing
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    System.err.println("Commit failed: " + e.getMessage());
} finally {
    consumer.close();
}

Manual Partition Assignment

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

Consumer<String, String> consumer = new KafkaConsumer<>(props);

// Manually assign partitions (no consumer group)
TopicPartition partition0 = new TopicPartition("my-topic", 0);
TopicPartition partition1 = new TopicPartition("my-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));

// Seek to specific offset
consumer.seek(partition0, 100L);
consumer.seekToBeginning(Collections.singletonList(partition1));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
}

Partition-Based Processing

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    // Process records partition by partition
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
        System.out.println("Processing " + partitionRecords.size() +
            " records from partition " + partition.partition());

        for (ConsumerRecord<String, String> record : partitionRecords) {
            processRecord(record);
        }

        // Commit offset for this partition
        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
        consumer.commitSync(offsets);
    }
}

Pattern-Based Subscription

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.regex.Pattern;

Consumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to all topics matching pattern
Pattern pattern = Pattern.compile("metrics-.*");
consumer.subscribe(pattern);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Topic: " + record.topic() + ", Value: " + record.value());
    }
}

Seeking to Specific Timestamp

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

// Poll once to get partition assignment
consumer.poll(Duration.ofMillis(0));

// Get assigned partitions
Set<TopicPartition> assignment = consumer.assignment();

// Seek to timestamp (e.g., 1 hour ago)
long timestamp = System.currentTimeMillis() - 3600000;
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : assignment) {
    timestampsToSearch.put(partition, timestamp);
}

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes =
    consumer.offsetsForTimes(timestampsToSearch);

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
    if (entry.getValue() != null) {
        consumer.seek(entry.getKey(), entry.getValue().offset());
    }
}

// Continue consuming from the seeked position
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
}

Pause and Resume

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    Set<TopicPartition> assignment = consumer.assignment();

    for (ConsumerRecord<String, String> record : records) {
        try {
            processRecord(record);
        } catch (BackpressureException e) {
            // Pause consumption due to backpressure
            System.out.println("Pausing consumption due to backpressure");
            consumer.pause(assignment);

            // Wait and resume
            Thread.sleep(5000);
            consumer.resume(assignment);
            System.out.println("Resuming consumption");
        }
    }
}

Share Consumer with Implicit Acknowledgement

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

// Configure share consumer with implicit acknowledgement (default)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
// Implicit mode: records are auto-acknowledged on next poll or commitSync
props.put("share.acknowledgement.mode", "implicit");

ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                processRecord(record);
                // No explicit acknowledge needed in implicit mode
            } catch (Exception e) {
                System.err.println("Processing failed: " + e.getMessage());
                // Record will be auto-acknowledged anyway in implicit mode
            }
        }
        // Records are acknowledged on next poll or explicit commitSync
    }
} finally {
    shareConsumer.close();
}

Share Consumer with Explicit Acknowledgement

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

// Configure share consumer with explicit acknowledgement
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
// Explicit mode: must call acknowledge() for each record
props.put("share.acknowledgement.mode", "explicit");

ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                processRecord(record);
                // ACCEPT: Successfully processed
                shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
            } catch (RetriableException e) {
                // RELEASE: Temporary failure, requeue for retry
                System.err.println("Temporary failure, releasing: " + e.getMessage());
                shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
            } catch (PermanentException e) {
                // REJECT: Permanent failure, send to DLQ or discard
                System.err.println("Permanent failure, rejecting: " + e.getMessage());
                shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
            }
        }
    }
} finally {
    shareConsumer.close();
}

Share Consumer with Delivery Count Tracking

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // Check delivery count for dead-letter pattern
            Optional<Short> deliveryCount = record.deliveryCount();

            if (deliveryCount.isPresent() && deliveryCount.get() > 3) {
                // Send to dead-letter queue after 3 retries
                System.err.println("Max retries exceeded, sending to DLQ: " +
                                   record.key());
                sendToDeadLetterQueue(record);
                shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
            } else {
                try {
                    processRecord(record);
                    shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
                } catch (Exception e) {
                    System.err.println("Processing failed (attempt " +
                                       deliveryCount.orElse((short) 1) + "): " +
                                       e.getMessage());
                    shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
                }
            }
        }
    }
} finally {
    shareConsumer.close();
}

Share Consumer with Asynchronous Commit Callback

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicIdPartition;
import java.time.Duration;
import java.util.*;

ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);

// Set acknowledgement commit callback
shareConsumer.setAcknowledgementCommitCallback((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Acknowledgement commit failed: " + exception.getMessage());
        // Handle commit failure (e.g., retry logic)
    } else {
        System.out.println("Successfully committed acknowledgements for " +
                           offsets.size() + " partitions");
    }
});

shareConsumer.subscribe(Collections.singletonList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
            shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
        }
        // Commit asynchronously - callback will be invoked
        shareConsumer.commitAsync();
    }
} finally {
    shareConsumer.close();
}

Thread Safety

WARNING: KafkaConsumer is NOT thread-safe. It should be used by only one thread. To consume from multiple threads, use multiple consumer instances.

Note: ShareConsumer is also NOT thread-safe and should be used by only one thread. However, share groups naturally support multiple consumers from the same partition, so use multiple ShareConsumer instances across threads/processes for parallel consumption.

Multi-threaded Pattern:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

// Create multiple consumer instances, one per thread
ExecutorService executor = Executors.newFixedThreadPool(3);

for (int i = 0; i < 3; i++) {
    final int threadId = i;
    executor.submit(() -> {
        // Each thread has its own consumer instance
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-" + threadId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Thread " + threadId + " processing: " +
                        record.value());
                }
            }
        } finally {
            consumer.close();
        }
    });
}

Wakeup Pattern:

The only thread-safe method is wakeup(), which can be called from another thread to interrupt a blocked poll():

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;

final Consumer<String, String> consumer = new KafkaConsumer<>(props);

// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Shutting down consumer");
    consumer.wakeup(); // Thread-safe call
}));

try {
    consumer.subscribe(Arrays.asList("my-topic"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
        }
    }
} catch (WakeupException e) {
    // Expected on shutdown
    System.out.println("Consumer woken up");
} finally {
    consumer.close();
}

Best Practices

Graceful Shutdown

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class GracefulConsumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Consumer<String, String> consumer;

    public GracefulConsumer(Consumer<String, String> consumer) {
        this.consumer = consumer;
    }

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("my-topic"));

            while (!closed.get()) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    processRecord(record);
                }
                consumer.commitAsync();
            }
        } catch (WakeupException e) {
            // Ignore for shutdown
        } finally {
            try {
                consumer.commitSync(); // Final commit
            } finally {
                consumer.close();
            }
        }
    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

Error Handling

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.*;
import java.time.Duration;

while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
        }
        consumer.commitSync();
    } catch (CommitFailedException e) {
        // Consumer rebalanced, need to rejoin
        System.err.println("Commit failed, rebalancing: " + e.getMessage());
    } catch (OffsetOutOfRangeException e) {
        // Seek to valid offset
        consumer.seekToBeginning(consumer.assignment());
    } catch (SerializationException e) {
        // Skip malformed record
        System.err.println("Deserialization error: " + e.getMessage());
    } catch (Exception e) {
        System.err.println("Unexpected error: " + e.getMessage());
    }
}

Monitoring

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;

Map<MetricName, ? extends Metric> metrics = consumer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
    MetricName name = entry.getKey();
    if (name.name().equals("records-lag-max")) {
        System.out.println("Max lag: " + entry.getValue().metricValue());
    }
    if (name.name().equals("fetch-rate")) {
        System.out.println("Fetch rate: " + entry.getValue().metricValue());
    }
}

Key metrics to monitor:

  • records-lag-max: Maximum lag across all partitions
  • records-lag-avg: Average lag across all partitions
  • fetch-rate: Fetch requests per second
  • fetch-latency-avg: Average fetch latency
  • records-consumed-rate: Records consumed per second
  • commit-latency-avg: Average commit latency

Troubleshooting

Common Consumer Issues

Issue: CommitFailedException

Symptoms:

  • CommitFailedException during commitSync()
  • Consumer constantly rebalancing
  • Messages being reprocessed

Causes:

  • Processing takes longer than max.poll.interval.ms
  • Consumer rebalanced before commit
  • Network issues causing session timeout

Solutions:

// Increase max poll interval
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // 10 minutes

// Or process in smaller batches
Properties props = new Properties();
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // Smaller batches
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            processRecord(record);
        } catch (Exception e) {
            System.err.println("Processing error: " + e.getMessage());
            // Continue processing other records
        }
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        // Consumer rebalanced - offsets will be reset to last commit
        System.err.println("Commit failed due to rebalance: " + e.getMessage());
        // No action needed - next poll will trigger rejoin
    }
}

Prevention:

  • Monitor processing time per poll
  • Set max.poll.interval.ms > 2x average processing time
  • Process records in parallel within same consumer
  • Use multiple consumer instances for better parallelism

Issue: Frequent Rebalancing

Symptoms:

  • Frequent "Revoke" and "Assign" log messages
  • High consumer lag despite capacity
  • Partition assignment constantly changing

Causes:

  • session.timeout.ms too low
  • Network instability
  • Long processing between polls
  • Consumer crashes or GC pauses

Solutions:

// Increase session timeout
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); // 60 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); // 5 seconds

// Use static membership (prevents rebalance on restart)
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
    "consumer-" + InetAddress.getLocalHost().getHostName());

// Monitor rebalances
consumer.subscribe(Collections.singletonList("my-topic"),
    new ConsumerRebalanceListener() {
        private long rebalanceCount = 0;
        private long lastRebalanceTime = System.currentTimeMillis();
        
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            rebalanceCount++;
            long timeSinceLastRebalance = System.currentTimeMillis() - lastRebalanceTime;
            System.out.println("Rebalance #" + rebalanceCount + 
                " after " + timeSinceLastRebalance + "ms");
            lastRebalanceTime = System.currentTimeMillis();
            
            if (timeSinceLastRebalance < 60000) {
                System.err.println("WARNING: Frequent rebalancing detected");
            }
            
            consumer.commitSync();
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Assigned partitions: " + partitions);
        }
    });

Prevention:

  • Use static membership for stable consumer groups
  • Tune session and heartbeat timeouts
  • Monitor GC pauses
  • Ensure network stability
  • Avoid long-running operations between polls

Issue: OffsetOutOfRangeException

Symptoms:

  • OffsetOutOfRangeException on poll
  • Consumer stuck not making progress
  • Messages being skipped

Causes:

  • Committed offset older than log retention
  • Log truncation on broker
  • Partition leader failure with data loss
  • Incorrect manual offset seek

Solutions:

while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
        }
        consumer.commitSync();
    } catch (OffsetOutOfRangeException e) {
        // Determine recovery strategy
        System.err.println("Offset out of range: " + e.getMessage());
        
        // Option 1: Seek to beginning (may reprocess data)
        consumer.seekToBeginning(consumer.assignment());
        
        // Option 2: Seek to end (skips to latest, may lose data)
        // consumer.seekToEnd(consumer.assignment());
        
        // Option 3: Seek to specific timestamp
        long timestamp = System.currentTimeMillis() - 3600000; // 1 hour ago
        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
        for (TopicPartition partition : consumer.assignment()) {
            timestampsToSearch.put(partition, timestamp);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = 
            consumer.offsetsForTimes(timestampsToSearch);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
            if (entry.getValue() != null) {
                consumer.seek(entry.getKey(), entry.getValue().offset());
            }
        }
        
        // Option 4: Seek to beginning offsets for each partition
        Map<TopicPartition, Long> beginningOffsets = 
            consumer.beginningOffsets(consumer.assignment());
        for (Map.Entry<TopicPartition, Long> entry : beginningOffsets.entrySet()) {
            consumer.seek(entry.getKey(), entry.getValue());
        }
    }
}

Prevention:

  • Set appropriate log retention on topics
  • Use auto.offset.reset for initial offset strategy
  • Monitor committed offsets vs log boundaries
  • Implement offset validation before committing

Issue: Consumer Lag Growing

Symptoms:

  • records-lag-max metric increasing
  • Consumer falling behind producers
  • End-to-end latency increasing

Causes:

  • Slow processing
  • Insufficient consumer instances
  • Insufficient partitions
  • Consumer group imbalance

Solutions:

// Monitor lag continuously
public class LagMonitor {
    public void checkLag(Consumer<?, ?> consumer) {
        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
        
        for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
            MetricName name = entry.getKey();
            
            if (name.name().equals("records-lag-max")) {
                double lag = (double) entry.getValue().metricValue();
                System.out.println("Current lag: " + lag + " records");
                
                if (lag > 100000) {
                    System.err.println("CRITICAL: High consumer lag detected");
                    // Actions:
                    // 1. Add more consumer instances
                    // 2. Optimize processing logic
                    // 3. Increase partitions (requires topic recreation)
                    // 4. Use parallel processing within consumer
                }
            }
        }
    }
}

// Parallel processing pattern (same consumer instance, different threads)
ExecutorService executor = Executors.newFixedThreadPool(10);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    List<Future<?>> futures = new ArrayList<>();
    for (ConsumerRecord<String, String> record : records) {
        Future<?> future = executor.submit(() -> {
            try {
                processRecord(record);
            } catch (Exception e) {
                System.err.println("Error processing: " + e.getMessage());
            }
        });
        futures.add(future);
    }
    
    // Wait for all processing to complete
    for (Future<?> future : futures) {
        try {
            future.get();
        } catch (Exception e) {
            System.err.println("Processing thread error: " + e.getMessage());
        }
    }
    
    // Commit after all records processed
    consumer.commitSync();
}

Prevention:

  • Scale consumer instances to match partition count
  • Optimize processing logic
  • Use async processing where appropriate
  • Monitor lag metrics
  • Set up alerts for lag thresholds

Edge Cases

Null Keys and Values

// Consumers handle null keys and values
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    // Key may be null
    String key = record.key(); // May be null
    if (key == null) {
        System.out.println("Record with null key");
    }
    
    // Value may be null (tombstone in compacted topics)
    String value = record.value(); // May be null
    if (value == null) {
        System.out.println("Tombstone record for key: " + key);
        // Handle deletion
        handleDeletion(key);
    } else {
        // Handle update
        handleUpdate(key, value);
    }
}

Empty Poll Results

// Poll may return empty results
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
    // No records available
    // This is normal and expected
    // Don't assume error or end of stream
} else {
    // Process records
    System.out.println("Received " + records.count() + " records");
}

// Common mistake: Assuming non-empty means end of stream
// Correct: Keep polling in loop
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // Process records (may be empty)
}

Duplicate Records

// Consumers may receive duplicate records due to:
// 1. Rebalancing before commit
// 2. Consumer restart after crash
// 3. Network issues during commit

// Implement idempotent processing
Set<String> processedKeys = new HashSet<>();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        String deduplicationKey = record.topic() + "-" + 
            record.partition() + "-" + record.offset();
        
        if (processedKeys.contains(deduplicationKey)) {
            System.out.println("Skipping duplicate: " + deduplicationKey);
            continue;
        }
        
        processRecord(record);
        processedKeys.add(deduplicationKey);
        
        // Cleanup old entries periodically
        if (processedKeys.size() > 100000) {
            processedKeys.clear(); // Or use LRU cache
        }
    }
    
    consumer.commitSync();
}

// Or use external deduplication store
// (database, Redis, etc.) for durability

Partition Assignment Changes

// Handle partition assignment/revocation properly
Map<TopicPartition, Long> partitionState = new HashMap<>();

consumer.subscribe(Collections.singletonList("my-topic"),
    new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Revoking partitions: " + partitions);
            
            // Save state before revocation
            for (TopicPartition partition : partitions) {
                Long state = partitionState.get(partition);
                if (state != null) {
                    saveStateToExternalStore(partition, state);
                }
                partitionState.remove(partition);
            }
            
            // Commit offsets
            consumer.commitSync();
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Assigned partitions: " + partitions);
            
            // Load state for new partitions
            for (TopicPartition partition : partitions) {
                Long state = loadStateFromExternalStore(partition);
                if (state != null) {
                    partitionState.put(partition, state);
                }
            }
        }
        
        @Override
        public void onPartitionsLost(Collection<TopicPartition> partitions) {
            System.out.println("Lost partitions: " + partitions);
            
            // Partitions lost without chance to commit
            // Clean up state
            for (TopicPartition partition : partitions) {
                partitionState.remove(partition);
            }
        }
    });

Consumer Group Rebalance During Processing

// Rebalance can occur during record processing
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            // Long processing operation
            processRecordSlowly(record);
            
            // Rebalance may have occurred during processing
            // Check if still assigned to this partition
            Set<TopicPartition> assignment = consumer.assignment();
            TopicPartition recordPartition = 
                new TopicPartition(record.topic(), record.partition());
            
            if (!assignment.contains(recordPartition)) {
                System.out.println("Partition " + recordPartition + 
                    " no longer assigned, skipping commit");
                break; // Exit processing loop
            }
            
        } catch (CommitFailedException e) {
            // Rebalance happened, consumer no longer owns partition
            System.err.println("Commit failed: " + e.getMessage());
            break; // Exit and repoll to rejoin
        }
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        // Rebalanced during commit, will rejoin on next poll
        System.err.println("Commit failed: " + e.getMessage());
    }
}

Share Consumer Edge Cases

Record Lock Expiration

// Share consumer records have time-limited locks (default 30 seconds)
// If processing exceeds lock duration, record may be delivered to another consumer

ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        long processingStartTime = System.currentTimeMillis();
        
        try {
            processRecord(record);
            
            long processingDuration = System.currentTimeMillis() - processingStartTime;
            
            // Check if processing might have exceeded lock duration
            if (processingDuration > 25000) { // 25 seconds threshold
                System.err.println("WARNING: Processing took " + processingDuration + 
                    "ms, approaching lock expiration");
            }
            
            shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
            
        } catch (Exception e) {
            long processingDuration = System.currentTimeMillis() - processingStartTime;
            
            if (processingDuration > 30000) {
                // Lock likely expired - record may have been delivered elsewhere
                System.err.println("Lock may have expired after " + 
                    processingDuration + "ms, rejecting to avoid duplicate processing");
                shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
            } else {
                // Release for retry
                shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
            }
        }
    }
}

Prevention:

  • Keep processing time under lock duration (default 30s)
  • Configure broker share.record.lock.duration.ms appropriately
  • Implement timeout detection in processing
  • Use REJECT for records that took too long

Delivery Count Overflow

// Delivery count is Optional<Short> - max value 32,767
ShareConsumer<String, String> shareConsumer = new KafkaShareConsumer<>(props);
shareConsumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        Optional<Short> deliveryCount = record.deliveryCount();
        
        if (deliveryCount.isPresent()) {
            short count = deliveryCount.get();
            
            // Check for unusually high delivery counts
            if (count > 10) {
                System.err.println("High delivery count: " + count + " for record: " + 
                    record.key());
                
                if (count > 100) {
                    // Likely poison pill message - send to DLQ
                    sendToDeadLetterQueue(record);
                    shareConsumer.acknowledge(record, AcknowledgeType.REJECT);
                    continue;
                }
            }
        }
        
        try {
            processRecord(record);
            shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
        } catch (Exception e) {
            shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
        }
    }
}

Performance Tuning

High Throughput Configuration

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

// Fetch more data per request
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10240); // 10KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms

// Process more records per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

// Larger partition fetch size
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2097152); // 2MB

// Async commits for better throughput
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    
    // Async commit doesn't block
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            System.err.println("Commit failed: " + exception.getMessage());
        }
    });
}

Low Latency Configuration

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "low-latency-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

// Minimize fetch wait time
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); // Fetch immediately
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10); // Max 10ms wait

// Small poll batches
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);

// Frequent commits
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    
    if (!records.isEmpty()) {
        consumer.commitSync(); // Synchronous for immediate persistence
    }
}

Configuration Interdependencies

Heartbeat and Session Timeout Relationship

// heartbeat.interval.ms must be < session.timeout.ms / 3
// Typical configuration:

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 45 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 3 seconds (45s / 15)

// Invalid configuration (will throw ConfigException):
// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
// props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); // ERROR: > 10s / 3

// Relationship with max.poll.interval.ms:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes

// max.poll.interval.ms should be:
// - Greater than expected max processing time per poll
// - Typically 10-20x session.timeout.ms

Auto Commit Timing

// With auto-commit enabled, commits happen:
// 1. On each poll() if auto.commit.interval.ms has elapsed
// 2. During close()

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 5 seconds

// Auto-commit behavior:
consumer.subscribe(Collections.singletonList("my-topic"));

// First poll at T=0s
ConsumerRecords<String, String> records1 = consumer.poll(Duration.ofMillis(100));
// Offsets not committed yet

// Second poll at T=2s
ConsumerRecords<String, String> records2 = consumer.poll(Duration.ofMillis(100));
// Offsets not committed yet (< 5s interval)

// Third poll at T=6s
ConsumerRecords<String, String> records3 = consumer.poll(Duration.ofMillis(100));
// Offsets committed during this poll (>= 5s interval)

// Edge case: Records processed but not committed if crash before next poll
// Solution: Use manual commit for critical applications
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Common Anti-Patterns

Anti-Pattern: Long Processing Between Polls

// DON'T DO THIS - Causes rebalancing
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        processRecordVerySlowly(record); // Takes minutes per record
        // max.poll.interval.ms exceeded → rebalance
    }
    
    consumer.commitSync();
}

// DO THIS - Process quickly or use async processing
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    // Option 1: Quick processing
    for (ConsumerRecord<String, String> record : records) {
        processRecordQuickly(record);
    }
    consumer.commitSync();
    
    // Option 2: Async processing
    for (ConsumerRecord<String, String> record : records) {
        asyncProcessor.submit(() -> processRecordSlowly(record));
    }
    consumer.commitSync(); // Commit immediately after poll
}

Anti-Pattern: Creating Consumer Per Message

// DON'T DO THIS - Very inefficient
public void consumeMessage(String topic) {
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(topic));
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // ... process
    consumer.close(); // Expensive
}

// DO THIS - Long-lived consumer
private final Consumer<String, String> consumer = createConsumer();

public void start() {
    consumer.subscribe(Collections.singletonList("my-topic"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        // Process records
    }
}

Anti-Pattern: Not Handling Rebalance

// DON'T DO THIS - Loses uncommitted offsets
consumer.subscribe(Collections.singletonList("my-topic"));
// No rebalance listener

// DO THIS - Commit before rebalance
consumer.subscribe(Collections.singletonList("my-topic"),
    new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            consumer.commitSync(); // Commit before giving up partitions
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Initialize for new partitions
        }
    });

Capacity Planning

Sizing Consumer Group

// Optimal consumer group size:
// Number of consumers = Number of partitions

// Under-provisioned (2 consumers, 10 partitions):
// - Each consumer handles 5 partitions
// - Limited parallelism
// - High lag if consumers are slow

// Optimal (10 consumers, 10 partitions):
// - Each consumer handles 1 partition
// - Maximum parallelism
// - Lowest lag

// Over-provisioned (15 consumers, 10 partitions):
// - 10 consumers active
// - 5 consumers idle (no partitions assigned)
// - Wasted resources

// Calculate required consumers:
int partitionCount = getPartitionCount("my-topic");
int avgProcessingTimeMs = 100;
int desiredLagMs = 5000;

int requiredConsumers = (int) Math.ceil(
    (double) (avgProcessingTimeMs * msgRatePerSecond) / 
    (1000.0 * partitionCount)
);

System.out.println("Recommended consumers: " + 
    Math.min(requiredConsumers, partitionCount));

Memory Sizing

// Memory required per consumer:
// - max.poll.records * avg_record_size
// - fetch.max.bytes for buffering
// - Application processing buffers

// Example calculation:
int maxPollRecords = 500;
int avgRecordSize = 1024; // 1KB
int fetchMaxBytes = 52428800; // 50MB

long minMemoryRequired = (maxPollRecords * avgRecordSize) + 
    fetchMaxBytes + 
    (100 * 1024 * 1024); // 100MB for application

System.out.println("Minimum memory per consumer: " + 
    (minMemoryRequired / 1024 / 1024) + "MB");

// Configure JVM accordingly:
// -Xms256m -Xmx512m (for above example)

Advanced Consumer Patterns

Pattern: Parallel Processing Within Single Consumer

Process records in parallel while maintaining per-partition ordering:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

public class ParallelProcessingConsumer {
    private final Consumer<String, String> consumer;
    private final ExecutorService executorService;
    private final Map<TopicPartition, CompletableFuture<Void>> partitionFutures;
    
    public ParallelProcessingConsumer(Properties props, int numThreads) {
        this.consumer = new KafkaConsumer<>(props);
        this.executorService = Executors.newFixedThreadPool(numThreads);
        this.partitionFutures = new ConcurrentHashMap<>();
    }
    
    public void processWithParallelism() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            // Process each partition in parallel
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = 
                    records.records(partition);
                
                // Wait for previous processing of this partition to complete
                CompletableFuture<Void> previousFuture = partitionFutures.get(partition);
                if (previousFuture != null) {
                    try {
                        previousFuture.get(30, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        System.err.println("Previous partition processing failed: " + 
                            e.getMessage());
                    }
                }
                
                // Process partition records in parallel (maintains per-partition order)
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        try {
                            processRecord(record);
                        } catch (Exception e) {
                            System.err.println("Error processing record: " + e.getMessage());
                        }
                    }
                }, executorService);
                
                partitionFutures.put(partition, future);
            }
            
            // Wait for all partitions to finish processing
            List<CompletableFuture<Void>> allFutures = new ArrayList<>(partitionFutures.values());
            CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]))
                .join();
            
            // Commit offsets after all processing completes
            consumer.commitSync();
            partitionFutures.clear();
        }
    }
    
    private void processRecord(ConsumerRecord<String, String> record) {
        // Simulate processing
        System.out.println(Thread.currentThread().getName() + 
            " processing: " + record.key());
    }
    
    public void close() {
        executorService.shutdown();
        consumer.close();
    }
}

Pattern: Checkpointing Consumer with External State

Consumer that checkpoints state to external store for recovery:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class CheckpointingConsumer {
    private final Consumer<String, String> consumer;
    private final ExternalStateStore stateStore;
    private final Map<TopicPartition, ProcessingState> partitionStates;
    
    public CheckpointingConsumer(Properties props, ExternalStateStore stateStore) {
        this.consumer = new KafkaConsumer<>(props);
        this.stateStore = stateStore;
        this.partitionStates = new ConcurrentHashMap<>();
    }
    
    public void processWithCheckpointing() {
        consumer.subscribe(Arrays.asList("my-topic"),
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // Checkpoint all partition states before rebalance
                    for (TopicPartition partition : partitions) {
                        ProcessingState state = partitionStates.get(partition);
                        if (state != null) {
                            stateStore.save(partition, state);
                            partitionStates.remove(partition);
                        }
                    }
                    consumer.commitSync();
                    System.out.println("Checkpointed state for revoked partitions: " + partitions);
                }
                
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // Restore state for newly assigned partitions
                    for (TopicPartition partition : partitions) {
                        ProcessingState state = stateStore.load(partition);
                        if (state != null) {
                            partitionStates.put(partition, state);
                            // Seek to last checkpointed offset
                            consumer.seek(partition, state.getLastOffset() + 1);
                        }
                    }
                    System.out.println("Restored state for assigned partitions: " + partitions);
                }
            });
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                TopicPartition partition = new TopicPartition(
                    record.topic(), record.partition());
                
                // Get or create partition state
                ProcessingState state = partitionStates.computeIfAbsent(
                    partition, k -> new ProcessingState());
                
                // Process record with state
                processWithState(record, state);
                
                // Update state
                state.setLastOffset(record.offset());
                state.incrementProcessedCount();
            }
            
            // Periodically checkpoint (every 1000 records)
            for (Map.Entry<TopicPartition, ProcessingState> entry : partitionStates.entrySet()) {
                ProcessingState state = entry.getValue();
                if (state.getProcessedCount() >= 1000) {
                    stateStore.save(entry.getKey(), state);
                    state.resetProcessedCount();
                }
            }
            
            consumer.commitAsync();
        }
    }
    
    private void processWithState(ConsumerRecord<String, String> record, 
                                  ProcessingState state) {
        // Use state for processing
        System.out.println("Processing with state: " + state.getCustomData());
        state.setCustomData("Last processed: " + record.key());
    }
    
    // State class
    private static class ProcessingState {
        private long lastOffset;
        private long processedCount;
        private String customData;
        
        public long getLastOffset() { return lastOffset; }
        public void setLastOffset(long offset) { this.lastOffset = offset; }
        public long getProcessedCount() { return processedCount; }
        public void incrementProcessedCount() { this.processedCount++; }
        public void resetProcessedCount() { this.processedCount = 0; }
        public String getCustomData() { return customData; }
        public void setCustomData(String data) { this.customData = data; }
    }
    
    // External state store interface
    public interface ExternalStateStore {
        void save(TopicPartition partition, ProcessingState state);
        ProcessingState load(TopicPartition partition);
    }
}

Pattern: Rate-Limited Consumer

Consumer with configurable rate limiting:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

public class RateLimitedConsumer {
    private final Consumer<String, String> consumer;
    private final int maxRecordsPerSecond;
    private final AtomicLong lastProcessTime;
    private final AtomicLong processedInWindow;
    private final long windowSizeMs = 1000; // 1 second window
    
    public RateLimitedConsumer(Properties props, int maxRecordsPerSecond) {
        this.consumer = new KafkaConsumer<>(props);
        this.maxRecordsPerSecond = maxRecordsPerSecond;
        this.lastProcessTime = new AtomicLong(System.currentTimeMillis());
        this.processedInWindow = new AtomicLong(0);
    }
    
    public void processWithRateLimit() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                // Check rate limit
                if (!allowProcess()) {
                    // Pause consumption until rate limit allows
                    long sleepTime = calculateSleepTime();
                    if (sleepTime > 0) {
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
                
                // Process record
                processRecord(record);
                
                // Update rate limiting counters
                long now = System.currentTimeMillis();
                long lastTime = lastProcessTime.get();
                
                if (now - lastTime >= windowSizeMs) {
                    // New window
                    lastProcessTime.set(now);
                    processedInWindow.set(1);
                } else {
                    processedInWindow.incrementAndGet();
                }
            }
            
            consumer.commitAsync();
        }
    }
    
    private boolean allowProcess() {
        long now = System.currentTimeMillis();
        long lastTime = lastProcessTime.get();
        
        if (now - lastTime >= windowSizeMs) {
            return true; // New window started
        }
        
        return processedInWindow.get() < maxRecordsPerSecond;
    }
    
    private long calculateSleepTime() {
        long now = System.currentTimeMillis();
        long lastTime = lastProcessTime.get();
        long elapsed = now - lastTime;
        
        if (elapsed >= windowSizeMs) {
            return 0;
        }
        
        long remainingTime = windowSizeMs - elapsed;
        long processedCount = processedInWindow.get();
        
        if (processedCount >= maxRecordsPerSecond) {
            return remainingTime;
        }
        
        return 0;
    }
    
    private void processRecord(ConsumerRecord<String, String> record) {
        System.out.println("Processing: " + record.key());
    }
    
    public Map<String, Object> getRateLimitStats() {
        Map<String, Object> stats = new HashMap<>();
        stats.put("maxRecordsPerSecond", maxRecordsPerSecond);
        stats.put("processedInCurrentWindow", processedInWindow.get());
        stats.put("utilizationPercent", 
            (processedInWindow.get() * 100.0) / maxRecordsPerSecond);
        return stats;
    }
    
    public void close() {
        consumer.close();
    }
}

Pattern: Consumer with Automatic Lag Monitoring

Consumer that monitors and alerts on high lag:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

public class LagMonitoringConsumer {
    private final Consumer<String, String> consumer;
    private final ScheduledExecutorService scheduler;
    private final long lagThreshold;
    private final Map<TopicPartition, Long> lastCommittedOffsets;
    
    public LagMonitoringConsumer(Properties props, long lagThreshold) {
        this.consumer = new KafkaConsumer<>(props);
        this.lagThreshold = lagThreshold;
        this.lastCommittedOffsets = new ConcurrentHashMap<>();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        
        // Monitor lag every 30 seconds
        scheduler.scheduleAtFixedRate(this::checkLag, 30, 30, TimeUnit.SECONDS);
    }
    
    public void process() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            // Process records
            for (ConsumerRecord<String, String> record : records) {
                processRecord(record);
            }
            
            // Commit and track offsets
            Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = 
                    records.records(partition);
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                commitOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                lastCommittedOffsets.put(partition, lastOffset + 1);
            }
            
            if (!commitOffsets.isEmpty()) {
                consumer.commitAsync(commitOffsets, null);
            }
        }
    }
    
    private void checkLag() {
        Set<TopicPartition> assignment = consumer.assignment();
        if (assignment.isEmpty()) {
            return;
        }
        
        // Get end offsets
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
        
        System.out.println("\n=== Consumer Lag Report ===");
        boolean hasHighLag = false;
        long totalLag = 0;
        
        for (TopicPartition partition : assignment) {
            Long committedOffset = lastCommittedOffsets.get(partition);
            if (committedOffset == null) {
                // Get committed offset from Kafka
                OffsetAndMetadata offsetMetadata = 
                    consumer.committed(Collections.singleton(partition))
                        .get(partition);
                committedOffset = offsetMetadata != null ? offsetMetadata.offset() : 0L;
            }
            
            Long endOffset = endOffsets.get(partition);
            long lag = endOffset - committedOffset;
            totalLag += lag;
            
            String status = lag > lagThreshold ? "⚠️ HIGH" : "✅ OK";
            System.out.println(String.format("Partition %d: lag=%d %s", 
                partition.partition(), lag, status));
            
            if (lag > lagThreshold) {
                hasHighLag = true;
            }
        }
        
        System.out.println(String.format("Total lag: %d records", totalLag));
        System.out.println("===========================\n");
        
        if (hasHighLag) {
            System.err.println("⚠️  WARNING: High lag detected!");
            // In production: Send alert to monitoring system
        }
    }
    
    private void processRecord(ConsumerRecord<String, String> record) {
        System.out.println("Processing: " + record.key());
    }
    
    public Map<String, Long> getCurrentLag() {
        Set<TopicPartition> assignment = consumer.assignment();
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
        Map<String, Long> lagByPartition = new HashMap<>();
        
        for (TopicPartition partition : assignment) {
            Long committedOffset = lastCommittedOffsets.get(partition);
            if (committedOffset == null) {
                continue;
            }
            Long endOffset = endOffsets.get(partition);
            long lag = endOffset - committedOffset;
            lagByPartition.put(partition.toString(), lag);
        }
        
        return lagByPartition;
    }
    
    public void close() {
        scheduler.shutdown();
        consumer.close();
    }
}

Pattern: Fan-out Consumer Pattern

Single consumer that fans out to multiple processing pipelines:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;

public class FanOutConsumer {
    private final Consumer<String, String> consumer;
    private final List<ProcessingPipeline> pipelines;
    private final ExecutorService executor;
    
    public FanOutConsumer(Properties props) {
        this.consumer = new KafkaConsumer<>(props);
        this.pipelines = new ArrayList<>();
        this.executor = Executors.newCachedThreadPool();
    }
    
    public void addPipeline(String name, Predicate<ConsumerRecord<String, String>> filter,
                          RecordProcessor processor) {
        pipelines.add(new ProcessingPipeline(name, filter, processor));
    }
    
    public void process() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            List<Future<?>> futures = new ArrayList<>();
            
            for (ConsumerRecord<String, String> record : records) {
                // Fan out to all matching pipelines
                for (ProcessingPipeline pipeline : pipelines) {
                    if (pipeline.filter.test(record)) {
                        Future<?> future = executor.submit(() -> {
                            try {
                                pipeline.processor.process(record);
                            } catch (Exception e) {
                                System.err.println("Pipeline " + pipeline.name + 
                                    " error: " + e.getMessage());
                            }
                        });
                        futures.add(future);
                    }
                }
            }
            
            // Wait for all processing to complete
            for (Future<?> future : futures) {
                try {
                    future.get(30, TimeUnit.SECONDS);
                } catch (Exception e) {
                    System.err.println("Processing timeout: " + e.getMessage());
                }
            }
            
            // Commit after all pipelines complete
            consumer.commitAsync();
        }
    }
    
    private static class ProcessingPipeline {
        final String name;
        final Predicate<ConsumerRecord<String, String>> filter;
        final RecordProcessor processor;
        
        ProcessingPipeline(String name, 
                          Predicate<ConsumerRecord<String, String>> filter,
                          RecordProcessor processor) {
            this.name = name;
            this.filter = filter;
            this.processor = processor;
        }
    }
    
    @FunctionalInterface
    public interface RecordProcessor {
        void process(ConsumerRecord<String, String> record);
    }
    
    public void close() {
        executor.shutdown();
        consumer.close();
    }
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        
        FanOutConsumer consumer = new FanOutConsumer(props);
        
        // Add processing pipelines
        consumer.addPipeline("HighPriority",
            record -> record.value().startsWith("HIGH:"),
            record -> System.out.println("High priority: " + record.value()));
        
        consumer.addPipeline("Analytics",
            record -> record.value().contains("analytics"),
            record -> System.out.println("Analytics: " + record.value()));
        
        consumer.addPipeline("Logging",
            record -> true,  // Log everything
            record -> System.out.println("Log: " + record.value()));
        
        consumer.process();
    }
}

Consumer Monitoring Dashboard

Comprehensive monitoring for consumer health:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import java.util.concurrent.*;

public class ConsumerMetricsDashboard {
    private final Consumer<String, String> consumer;
    private final ScheduledExecutorService scheduler;
    
    public ConsumerMetricsDashboard(Consumer<String, String> consumer) {
        this.consumer = consumer;
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        
        // Print dashboard every 30 seconds
        scheduler.scheduleAtFixedRate(this::printDashboard, 30, 30, TimeUnit.SECONDS);
    }
    
    public void printDashboard() {
        System.out.println("\n=== Consumer Metrics Dashboard ===");
        System.out.println("Timestamp: " + new Date());
        System.out.println();
        
        // Get all metrics
        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
        
        // Consumption Rate
        System.out.println("CONSUMPTION:");
        printMetric(metrics, "records-consumed-rate", "records/sec");
        printMetric(metrics, "bytes-consumed-rate", "bytes/sec");
        printMetric(metrics, "fetch-rate", "fetches/sec");
        System.out.println();
        
        // Lag
        System.out.println("LAG:");
        printMetric(metrics, "records-lag-max", "records");
        printMetric(metrics, "records-lag-avg", "records");
        
        // Calculate lag percentage
        Set<TopicPartition> assignment = consumer.assignment();
        if (!assignment.isEmpty()) {
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
            long totalLag = 0;
            long totalEndOffset = 0;
            
            for (TopicPartition partition : assignment) {
                long position = consumer.position(partition);
                long endOffset = endOffsets.get(partition);
                totalLag += (endOffset - position);
                totalEndOffset += endOffset;
            }
            
            double lagPercent = totalEndOffset > 0 ? 
                (totalLag * 100.0 / totalEndOffset) : 0;
            System.out.println(String.format("  Lag Percentage: %.2f%%", lagPercent));
        }
        System.out.println();
        
        // Latency
        System.out.println("LATENCY:");
        printMetric(metrics, "fetch-latency-avg", "ms");
        printMetric(metrics, "fetch-latency-max", "ms");
        printMetric(metrics, "commit-latency-avg", "ms");
        System.out.println();
        
        // Partition Assignment
        System.out.println("ASSIGNMENT:");
        System.out.println("  Assigned Partitions: " + assignment.size());
        for (TopicPartition partition : assignment) {
            long position = consumer.position(partition);
            System.out.println(String.format("    %s: position=%d",
                partition, position));
        }
        System.out.println();
        
        // Rebalancing
        System.out.println("REBALANCING:");
        printMetric(metrics, "rebalance-total", "count");
        printMetric(metrics, "rebalance-latency-avg", "ms");
        System.out.println();
        
        // Health Alerts
        System.out.println("HEALTH ALERTS:");
        checkHealthAlerts(metrics);
        
        System.out.println("===================================\n");
    }
    
    private void printMetric(Map<MetricName, ? extends Metric> metrics, 
                            String metricName, String unit) {
        for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
            if (entry.getKey().name().equals(metricName)) {
                Object value = entry.getValue().metricValue();
                if (value instanceof Number) {
                    System.out.println(String.format("  %s: %.2f %s",
                        metricName, ((Number) value).doubleValue(), unit));
                }
                return;
            }
        }
    }
    
    private void checkHealthAlerts(Map<MetricName, ? extends Metric> metrics) {
        boolean healthy = true;
        
        // Check lag
        double maxLag = getMetricValue(metrics, "records-lag-max");
        if (maxLag > 10000) {
            System.out.println("  ⚠️  HIGH LAG: " + maxLag + " records");
            healthy = false;
        }
        
        // Check fetch latency
        double fetchLatency = getMetricValue(metrics, "fetch-latency-avg");
        if (fetchLatency > 1000) {
            System.out.println("  ⚠️  HIGH FETCH LATENCY: " + fetchLatency + " ms");
            healthy = false;
        }
        
        // Check commit failures
        double commitRate = getMetricValue(metrics, "commit-rate");
        if (commitRate == 0) {
            System.out.println("  ⚠️  NO COMMITS: Check auto-commit or manual commit logic");
            healthy = false;
        }
        
        if (healthy) {
            System.out.println("  ✅ All metrics healthy");
        }
    }
    
    private double getMetricValue(Map<MetricName, ? extends Metric> metrics, 
                                  String metricName) {
        for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
            if (entry.getKey().name().equals(metricName)) {
                Object value = entry.getValue().metricValue();
                if (value instanceof Number) {
                    return ((Number) value).doubleValue();
                }
            }
        }
        return 0.0;
    }
    
    public void close() {
        scheduler.shutdown();
    }
}