or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md
tile.json

streaming-consumer.mddocs/

Streaming Consumer

The FlinkKafkaConsumer011 provides robust Kafka topic consumption with exactly-once processing guarantees, flexible startup modes, and seamless integration with Flink's checkpointing mechanism.

Capabilities

FlinkKafkaConsumer011 Class

Main consumer class for reading from Kafka 0.11.x topics with comprehensive configuration options.

/**
 * Kafka consumer for Kafka 0.11.x supporting exactly-once semantics and checkpointing
 * Extends FlinkKafkaConsumer010 with additional 0.11.x-specific features
 */
@PublicEvolving
class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {

    // Single topic constructors
    FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    // Multiple topics constructors  
    FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    // Pattern-based topic subscription (for dynamic topic discovery)
    @PublicEvolving
    FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
    @PublicEvolving  
    FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
}

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

// Single topic consumption
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    "user-events",
    new SimpleStringSchema(),
    props
);

// Multiple topics consumption
List<String> topics = Arrays.asList("orders", "payments", "shipments");
FlinkKafkaConsumer011<String> multiTopicConsumer = new FlinkKafkaConsumer011<>(
    topics,
    new SimpleStringSchema(), 
    props
);

// Pattern-based subscription for dynamic topic discovery
Pattern topicPattern = Pattern.compile("events-.*");
FlinkKafkaConsumer011<String> patternConsumer = new FlinkKafkaConsumer011<>(
    topicPattern,
    new SimpleStringSchema(),
    props
);

Startup Mode Configuration

Control how the consumer starts reading from Kafka topics.

// Configure startup behavior (inherited from FlinkKafkaConsumerBase)
FlinkKafkaConsumer011<T> setStartFromEarliest();
FlinkKafkaConsumer011<T> setStartFromLatest(); 
FlinkKafkaConsumer011<T> setStartFromGroupOffsets();
FlinkKafkaConsumer011<T> setStartFromTimestamp(long startupOffsetsTimestamp);
FlinkKafkaConsumer011<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);

Usage Examples:

// Start from earliest available messages
consumer.setStartFromEarliest();

// Start from latest messages (skip existing messages)
consumer.setStartFromLatest();

// Start from committed offsets (default behavior)
consumer.setStartFromGroupOffsets();

// Start from specific timestamp
long timestamp = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago
consumer.setStartFromTimestamp(timestamp);

// Start from specific partition offsets
Map<KafkaTopicPartition, Long> offsets = new HashMap<>();
offsets.put(new KafkaTopicPartition("my-topic", 0), 1000L);
offsets.put(new KafkaTopicPartition("my-topic", 1), 2000L);
consumer.setStartFromSpecificOffsets(offsets);

Consumer Configuration

Essential configuration options for optimal consumer behavior.

// Key configuration constants (inherited from FlinkKafkaConsumerBase)
static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

Usage Examples:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-group");

// Kafka consumer configuration
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("enable.auto.commit", "false"); // Managed by Flink
props.setProperty("max.poll.records", "500");

// Flink-specific configuration
props.setProperty("flink.disable-metrics", "false");
props.setProperty("flink.partition-discovery.interval-millis", "30000"); // 30 seconds

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    "my-topic",
    new SimpleStringSchema(),
    props
);

// Enable partition discovery for dynamic partitions
consumer.setProperty("flink.partition-discovery.interval-millis", "30000");

Advanced Consumer Features

Additional configuration methods for specialized use cases.

// Commit mode configuration (inherited from FlinkKafkaConsumerBase)
FlinkKafkaConsumer011<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);

// Partition assignment and metadata access
FlinkKafkaConsumer011<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);

Usage Examples:

// Control offset committing behavior
consumer.setCommitOffsetsOnCheckpoints(true); // Commit offsets to Kafka on checkpoint

// Configure watermark generation for event time processing
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> extractTimestamp(event));

consumer.assignTimestampsAndWatermarks(watermarkStrategy);

// Integration with Flink DataStream
DataStream<String> kafkaStream = env
    .addSource(consumer)
    .name("Kafka Source");

Error Handling

The consumer integrates with the Flink Kafka exception hierarchy for comprehensive error management.

// Consumer may throw FlinkKafka011Exception for configuration or runtime errors
// Exception handling is typically done at the Flink job level through restart strategies

Configuration for resilience:

// Configure consumer for resilient operation
props.setProperty("session.timeout.ms", "30000");
props.setProperty("heartbeat.interval.ms", "10000");
props.setProperty("max.poll.interval.ms", "300000");
props.setProperty("connections.max.idle.ms", "540000");
props.setProperty("request.timeout.ms", "60000");