or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md
tile.json

kafka-consumer.mddocs/

Kafka Consumer API

Comprehensive API reference for Kafka 0.8 consumer classes in the Apache Flink Kafka Connector.

FlinkKafkaConsumer08<T>

Package: org.apache.flink.streaming.connectors.kafka
Annotations: @PublicEvolving
Extends: FlinkKafkaConsumerBase<T>
Description: The main Kafka consumer for Apache Kafka 0.8.x, providing streaming data source capabilities with exactly-once processing guarantees through Flink's checkpointing mechanism.

Class Declaration

@PublicEvolving
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>

Constants

// Serialization identifier (from FlinkKafkaConsumer08)
private static final long serialVersionUID = -6272159445203409112L;

// Configuration keys (from FlinkKafkaConsumer08)
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;

// Inherited constants from FlinkKafkaConsumerBase
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

Constructors

Single Topic Constructors

/**
 * Creates a consumer for a single topic with DeserializationSchema.
 *
 * @param topic The Kafka topic to read from
 * @param valueDeserializer The deserializer used to convert raw bytes to data objects
 * @param props Configuration properties for Kafka consumer and ZooKeeper client
 */
public FlinkKafkaConsumer08(String topic, 
                           DeserializationSchema<T> valueDeserializer, 
                           Properties props)

/**
 * Creates a consumer for a single topic with KafkaDeserializationSchema.
 *
 * @param topic The Kafka topic to read from
 * @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
 * @param props Configuration properties for Kafka consumer and ZooKeeper client
 */
public FlinkKafkaConsumer08(String topic, 
                           KafkaDeserializationSchema<T> deserializer, 
                           Properties props)

Multi-Topic Constructors

/**
 * Creates a consumer for multiple topics with DeserializationSchema.
 *
 * @param topics List of Kafka topics to read from
 * @param deserializer The deserializer used to convert raw bytes to data objects
 * @param props Configuration properties for Kafka consumer and ZooKeeper client
 */
public FlinkKafkaConsumer08(List<String> topics, 
                           DeserializationSchema<T> deserializer, 
                           Properties props)

/**
 * Creates a consumer for multiple topics with KafkaDeserializationSchema.
 *
 * @param topics List of Kafka topics to read from  
 * @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
 * @param props Configuration properties for Kafka consumer and ZooKeeper client
 */
public FlinkKafkaConsumer08(List<String> topics, 
                           KafkaDeserializationSchema<T> deserializer, 
                           Properties props)

Pattern Subscription Constructors

/**
 * Creates a consumer with pattern-based topic subscription and DeserializationSchema.
 * Enables dynamic discovery of topics matching the pattern.
 *
 * @param subscriptionPattern Regex pattern for topic names to subscribe to
 * @param valueDeserializer The deserializer used to convert raw bytes to data objects
 * @param props Configuration properties for Kafka consumer and ZooKeeper client
 */
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, 
                           DeserializationSchema<T> valueDeserializer, 
                           Properties props)

/**
 * Creates a consumer with pattern-based topic subscription and KafkaDeserializationSchema.
 * Enables dynamic discovery of topics matching the pattern.
 *
 * @param subscriptionPattern Regex pattern for topic names to subscribe to
 * @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
 * @param props Configuration properties for Kafka consumer and ZooKeeper client
 */
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, 
                           KafkaDeserializationSchema<T> deserializer, 
                           Properties props)

Configuration Methods (Inherited from FlinkKafkaConsumerBase)

Startup Position Configuration

/**
 * Configures the consumer to start reading from the earliest available offset.
 *
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> setStartFromEarliest()

/**
 * Configures the consumer to start reading from the latest available offset.
 *
 * @return This consumer instance for method chaining  
 */
public FlinkKafkaConsumerBase<T> setStartFromLatest()

/**
 * Configures the consumer to start reading from the consumer group's committed offsets.
 * This is the default behavior.
 *
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets()

/**
 * Configures the consumer to start reading from specific offsets per partition.
 *
 * @param specificStartupOffsets Map of partition to offset mappings
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)

Watermark and Timestamp Assignment

/**
 * Assigns a timestamp assigner and watermark generator for punctuated watermarks.
 *
 * @param assigner The assigner that generates punctuated watermarks
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner)

/**
 * Assigns a timestamp assigner and watermark generator for periodic watermarks.
 *
 * @param assigner The assigner that generates periodic watermarks
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner)

Checkpointing and Offset Management

/**
 * Configures whether the consumer should commit offsets back to Kafka on checkpoints.
 * This is necessary for exactly-once processing guarantees.
 *
 * @param commitOnCheckpoints Whether to commit offsets on checkpoints (default: true)
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints)

/**
 * Disables the filtering of restored partitions based on subscribed topics.
 * By default, only partitions of subscribed topics are restored.
 *
 * @return This consumer instance for method chaining
 */
public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics()

Lifecycle Methods (Inherited from FlinkKafkaConsumerBase)

/**
 * Opens the consumer and initializes resources.
 *
 * @param configuration The task configuration
 * @throws Exception If initialization fails
 */
public void open(Configuration configuration) throws Exception

/**
 * Main execution method that reads from Kafka and emits records.
 *
 * @param sourceContext The source context for emitting records
 * @throws Exception If execution fails
 */
public void run(SourceContext<T> sourceContext) throws Exception

/**
 * Cancels the consumer operation and stops reading.
 */
public void cancel()

/**
 * Closes the consumer and cleans up resources.
 *
 * @throws Exception If cleanup fails
 */
public void close() throws Exception

/**
 * Returns the type information for the produced data type.
 *
 * @return Type information for T
 */
public TypeInformation<T> getProducedType()

Checkpointing Interface Methods (Inherited)

/**
 * Initializes the state of the function from a checkpoint.
 *
 * @param context The initialization context
 * @throws Exception If state initialization fails
 */
public final void initializeState(FunctionInitializationContext context) throws Exception

/**
 * Snapshots the function's state during checkpointing.
 *
 * @param context The snapshot context
 * @throws Exception If state snapshotting fails
 */
public final void snapshotState(FunctionSnapshotContext context) throws Exception

/**
 * Notifies the function that a checkpoint has been completed.
 *
 * @param checkpointId The ID of the completed checkpoint
 * @throws Exception If notification processing fails
 */
public final void notifyCheckpointComplete(long checkpointId) throws Exception

Static Validation Methods

/**
 * Validate the ZooKeeper configuration, checking for required parameters.
 * 
 * @param props Properties to check
 * @throws IllegalArgumentException if required properties are missing or invalid
 */
protected static void validateZooKeeperConfig(Properties props)

Usage Examples

Basic Consumer Setup

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "my-consumer-group");
properties.setProperty("auto.offset.reset", "earliest");

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

env.addSource(consumer).print();

Multi-Topic Consumer with Custom Deserialization

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;

List<String> topics = Arrays.asList("topic1", "topic2", "topic3");

KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
    @Override
    public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return MyEvent.fromJson(new String(record.value()));
    }
    
    @Override
    public boolean isEndOfStream(MyEvent nextElement) {
        return nextElement.isEndMarker();
    }
    
    @Override
    public TypeInformation<MyEvent> getProducedType() {
        return TypeInformation.of(MyEvent.class);
    }
};

FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
    topics, schema, properties);

Pattern-Based Topic Subscription with Watermarks

import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.regex.Pattern;

Pattern topicPattern = Pattern.compile("metrics-.*");

FlinkKafkaConsumer08<MyMetric> consumer = new FlinkKafkaConsumer08<>(
    topicPattern,
    new MyMetricDeserializer(),
    properties
);

consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyMetric>() {
    private long currentMaxTimestamp = Long.MIN_VALUE;
    
    @Override
    public long extractTimestamp(MyMetric element, long previousElementTimestamp) {
        long timestamp = element.getTimestamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
    
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - 5000); // 5 second delay
    }
});

consumer.setStartFromLatest()
        .setCommitOffsetsOnCheckpoints(true);

Specific Offset Configuration

import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;

Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 2), 54321L);

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic", new SimpleStringSchema(), properties);

consumer.setStartFromSpecificOffsets(specificStartupOffsets)
        .setCommitOffsetsOnCheckpoints(true);

Deprecated Consumer Classes

FlinkKafkaConsumer081<T>

@Deprecated
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T>

/**
 * @deprecated Use FlinkKafkaConsumer08 instead
 */
@Deprecated  
public FlinkKafkaConsumer081(String topic, 
                            DeserializationSchema<T> valueDeserializer, 
                            Properties props)

FlinkKafkaConsumer082<T>

@Deprecated
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T>

/**
 * @deprecated Use FlinkKafkaConsumer08 instead  
 */
@Deprecated
public FlinkKafkaConsumer082(String topic, 
                            DeserializationSchema<T> valueDeserializer, 
                            Properties props)

KafkaDeserializationSchema<T> Interface

Package: org.apache.flink.streaming.connectors.kafka
Annotations: @PublicEvolving
Extends: Serializable, ResultTypeQueryable<T>

@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    
    /**
     * Method to decide whether the element signals the end of the stream.
     * If this returns true, the element won't be emitted.
     *
     * @param nextElement The element to test for end-of-stream signal
     * @return True if the element signals end of stream, false otherwise
     */
    boolean isEndOfStream(T nextElement);
    
    /**
     * Deserializes the Kafka record.
     *
     * @param record The ConsumerRecord from Kafka to deserialize
     * @return The deserialized message as an object of type T
     * @throws Exception If deserialization fails
     */
    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
}

Custom KafkaDeserializationSchema Example

public class CustomEventDeserializer implements KafkaDeserializationSchema<CustomEvent> {
    
    @Override
    public CustomEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // Access all ConsumerRecord information
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        long timestamp = record.timestamp();
        byte[] key = record.key();
        byte[] value = record.value();
        
        // Custom deserialization logic
        CustomEvent event = parseEvent(value);
        event.setMetadata(topic, partition, offset, timestamp);
        
        return event;
    }
    
    @Override
    public boolean isEndOfStream(CustomEvent nextElement) {
        // Check for end-of-stream marker
        return nextElement != null && nextElement.getEventType().equals("END_STREAM");
    }
    
    @Override
    public TypeInformation<CustomEvent> getProducedType() {
        return TypeInformation.of(CustomEvent.class);
    }
    
    private CustomEvent parseEvent(byte[] data) throws Exception {
        // Implementation specific parsing logic
        return CustomEvent.fromBytes(data);
    }
}

Configuration Properties

Required Properties

Properties props = new Properties();

// Required: ZooKeeper connection for metadata
props.setProperty("zookeeper.connect", "localhost:2181");

// Required: Consumer group ID
props.setProperty("group.id", "my-consumer-group");

Optional Consumer Properties

// Offset reset behavior when no committed offset exists
props.setProperty("auto.offset.reset", "earliest"); // or "latest"

// Socket and network settings
props.setProperty("socket.timeout.ms", "30000");
props.setProperty("socket.receive.buffer.bytes", "65536");

// Fetch settings
props.setProperty("fetch.message.max.bytes", "1048576");
props.setProperty("fetch.wait.max.ms", "100");

// ZooKeeper settings
props.setProperty("zookeeper.session.timeout.ms", "6000");
props.setProperty("zookeeper.connection.timeout.ms", "6000");

// Auto-commit settings (recommend disabling for exactly-once)
props.setProperty("auto.commit.enable", "false");
props.setProperty("auto.commit.interval.ms", "60000");

Flink-Specific Properties

// Partition discovery interval (milliseconds) 
props.setProperty("flink.partition-discovery.interval-millis", "30000");

// Disable metrics collection
props.setProperty("flink.disable-metrics", "false");

// Retry configuration for partition discovery
props.setProperty("flink.get-partitions.retry", "3");