or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md
tile.json

kafka-consumer.mddocs/

Kafka Consumer

The Flink Kafka consumer provides reliable message consumption from Kafka 0.8.x topics with exactly-once processing guarantees through Flink's checkpointing mechanism.

Capabilities

FlinkKafkaConsumer08

The primary consumer class for Kafka 0.8.x integration with comprehensive configuration options.

/**
 * Kafka consumer for Apache Kafka 0.8.x with exactly-once processing guarantees
 */
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    
    /** Configuration key for partition retrieval retries */
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    
    /** Default number of partition retrieval retries */
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    
    /**
     * Creates consumer for single topic with value-only deserialization
     * @param topic Kafka topic name
     * @param valueDeserializer Deserialization schema for message values
     * @param props Kafka consumer properties
     */
    public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    
    /**
     * Creates consumer for single topic with key-value deserialization
     * @param topic Kafka topic name
     * @param deserializer Keyed deserialization schema for both keys and values
     * @param props Kafka consumer properties
     */
    public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Creates consumer for multiple topics with value-only deserialization
     * @param topics List of Kafka topic names
     * @param deserializer Deserialization schema for message values
     * @param props Kafka consumer properties
     */
    public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Creates consumer for multiple topics with key-value deserialization
     * @param topics List of Kafka topic names
     * @param deserializer Keyed deserialization schema for both keys and values
     * @param props Kafka consumer properties
     */
    public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Gets partition information for specified topics
     * @param topics List of topic names to analyze
     * @param properties Kafka connection properties
     * @return List of partition leaders for the topics
     */
    public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties);
}

Usage Examples:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import java.util.Properties;
import java.util.Arrays;

// Basic single topic consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    props
);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(consumer);

// Multi-topic consumer with JSON deserialization
FlinkKafkaConsumer08<ObjectNode> jsonConsumer = new FlinkKafkaConsumer08<>(
    Arrays.asList("topic1", "topic2", "topic3"),
    new JSONKeyValueDeserializationSchema(false), // false = include metadata
    props
);

DataStream<ObjectNode> jsonStream = env.addSource(jsonConsumer);

// Consumer with custom offset reset
Properties customProps = new Properties();
customProps.setProperty("bootstrap.servers", "localhost:9092");
customProps.setProperty("zookeeper.connect", "localhost:2181");
customProps.setProperty("group.id", "custom-group");
customProps.setProperty("auto.offset.reset", "earliest");

FlinkKafkaConsumer08<String> earliestConsumer = new FlinkKafkaConsumer08<>(
    "events-topic",
    new SimpleStringSchema(),
    customProps
);

FlinkKafkaConsumer081 (Deprecated)

Deprecated alias that redirects to FlinkKafkaConsumer08.

/**
 * @deprecated Use FlinkKafkaConsumer08 instead
 */
@Deprecated
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {
    /**
     * @deprecated Use FlinkKafkaConsumer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
}

FlinkKafkaConsumer082 (Deprecated)

Deprecated alias that redirects to FlinkKafkaConsumer08.

/**
 * @deprecated Use FlinkKafkaConsumer08 instead
 */
@Deprecated
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {
    /**
     * @deprecated Use FlinkKafkaConsumer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
}

Configuration

Required Properties

  • bootstrap.servers: Comma-separated list of Kafka broker addresses
  • zookeeper.connect: ZooKeeper connection string for offset management
  • group.id: Consumer group identifier for offset coordination

Optional Properties

  • auto.offset.reset: Strategy when no initial offset (earliest, latest)
  • enable.auto.commit: Whether to automatically commit offsets (should be false for exactly-once)
  • session.timeout.ms: Session timeout for consumer group coordination
  • heartbeat.interval.ms: Heartbeat interval for group membership
  • max.poll.records: Maximum records returned in single poll
  • flink.get-partitions.retry: Number of retries for partition discovery (default: 3)

Checkpointing and Fault Tolerance

The consumer integrates with Flink's checkpointing mechanism:

// Enable checkpointing for exactly-once guarantees
env.enableCheckpointing(5000); // checkpoint every 5 seconds

// Consumer automatically participates in checkpointing
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    props
);

// Starting position can be configured
consumer.setStartFromEarliest();     // start from earliest available
consumer.setStartFromLatest();       // start from latest (default)
consumer.setStartFromGroupOffsets(); // start from committed group offsets
consumer.setStartFromTimestamp(timestamp); // start from specific timestamp

Error Handling

Common exceptions and handling strategies:

  • IllegalArgumentException: Invalid topic names or properties
  • RuntimeException: Kafka connection or ZooKeeper issues
  • SerializationException: Deserialization failures
// Proper error handling setup
try {
    FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
        "my-topic",
        new SimpleStringSchema(),
        props
    );
    
    env.addSource(consumer);
    env.execute("Kafka Consumer Job");
} catch (Exception e) {
    // Handle consumer setup or execution errors
    logger.error("Kafka consumer failed", e);
}