or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md
tile.json

data-consumption.mddocs/

Data Consumption

Streaming data source functionality for consuming from Kafka 0.9.x topics with exactly-once processing guarantees, configurable deserialization, automatic offset management, and fault tolerance through Flink's checkpointing mechanism.

Capabilities

FlinkKafkaConsumer09 Class

Main Kafka consumer class for streaming data from Kafka 0.9.x topics into Flink data streams.

/**
 * Kafka consumer for streaming data from Apache Kafka 0.9.x topics.
 * Supports exactly-once processing, fault tolerance, and parallel consumption.
 * 
 * @param <T> The type of records consumed from Kafka
 */
@PublicEvolving
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
    
    /**
     * Creates a new Kafka consumer for a single topic with value-only deserialization.
     * 
     * @param topic The name of the topic to consume from
     * @param valueDeserializer The deserializer for converting byte messages to objects
     * @param props Kafka consumer properties and configuration
     */
    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    
    /**
     * Creates a new Kafka consumer for a single topic with key/value deserialization.
     * 
     * @param topic The name of the topic to consume from
     * @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
     * @param props Kafka consumer properties and configuration
     */
    public FlinkKafkaConsumer09(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Creates a new Kafka consumer for multiple topics with value-only deserialization.
     * 
     * @param topics The list of topic names to consume from
     * @param deserializer The deserializer for converting byte messages to objects
     * @param props Kafka consumer properties and configuration
     */
    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Creates a new Kafka consumer for multiple topics with key/value deserialization.
     * 
     * @param topics The list of topic names to consume from
     * @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
     * @param props Kafka consumer properties and configuration
     */
    public FlinkKafkaConsumer09(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Creates a new Kafka consumer for topics matching a pattern with value-only deserialization.
     * Dynamic topic discovery enabled if partition discovery interval is configured.
     * 
     * @param subscriptionPattern Regular expression pattern for topic names to subscribe to
     * @param valueDeserializer The deserializer for converting byte messages to objects
     * @param props Kafka consumer properties and configuration
     */
    @PublicEvolving
    public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
    
    /**
     * Creates a new Kafka consumer for topics matching a pattern with key/value deserialization.
     * Dynamic topic discovery enabled if partition discovery interval is configured.
     * 
     * @param subscriptionPattern Regular expression pattern for topic names to subscribe to
     * @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
     * @param props Kafka consumer properties and configuration
     */
    @PublicEvolving
    public FlinkKafkaConsumer09(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    /**
     * Sets a rate limiter to throttle bytes read from Kafka.
     * 
     * @param kafkaRateLimiter The rate limiter to control consumption rate
     */
    public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
    
    /**
     * Gets the currently configured rate limiter.
     * 
     * @return The configured rate limiter, or null if none is set
     */
    public FlinkConnectorRateLimiter getRateLimiter();
    
    /**
     * Assigns watermarks and timestamp extractors using punctuated watermarks.
     * 
     * @param assigner The punctuated watermark assigner
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
    
    /**
     * Assigns watermarks and timestamp extractors using periodic watermarks.
     * 
     * @param assigner The periodic watermark assigner
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
    
    /**
     * Configures whether to commit consumed offsets back to Kafka on checkpoints.
     * 
     * @param commitOnCheckpoints True to enable offset commits on checkpoints
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
    
    /**
     * Configures the consumer to start reading from the earliest available offsets.
     * 
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> setStartFromEarliest();
    
    /**
     * Configures the consumer to start reading from the latest available offsets.
     * 
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> setStartFromLatest();
    
    /**
     * Configures the consumer to start reading from the consumer group's committed offsets.
     * 
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> setStartFromGroupOffsets();
    
    /**
     * Configures the consumer to start reading from specific partition offsets.
     * 
     * @param specificStartupOffsets Map of partitions to their starting offsets
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
    
    /**
     * Disables filtering of restored partitions with currently subscribed topics.
     * 
     * @return This consumer instance for method chaining
     */
    public FlinkKafkaConsumer09<T> disableFilterRestoredPartitionsWithSubscribedTopics();
}

Configuration Constants

/** Configuration key to change the polling timeout */
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

/** Default time in milliseconds spent waiting in poll if data is not available */
public static final long DEFAULT_POLL_TIMEOUT = 100L;

/** The maximum number of pending non-committed checkpoints to track */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

/** The default interval to execute partition discovery (disabled by default) */
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;

/** Boolean configuration key to disable metrics tracking */
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

/** Configuration key to define the consumer's partition discovery interval, in milliseconds */
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";

Usage Examples

Basic Single Topic Consumption

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

env.addSource(consumer)
   .print();

env.execute("Basic Kafka Consumer");

Multiple Topics Consumption

import java.util.Arrays;

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "multi-topic-group");

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    Arrays.asList("topic-1", "topic-2", "topic-3"),
    new SimpleStringSchema(),
    properties
);

env.addSource(consumer)
   .map(value -> "Processed: " + value)
   .print();

Pattern-Based Topic Subscription

import java.util.regex.Pattern;

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "pattern-group");
// Enable dynamic topic discovery
properties.setProperty("flink.partition-discovery.interval-millis", "30000");

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    Pattern.compile("log-topic-.*"),
    new SimpleStringSchema(),
    properties
);

env.addSource(consumer)
   .filter(value -> value.contains("ERROR"))
   .print();

Key/Value Deserialization

import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;

// Custom keyed deserializer for accessing keys, values, and metadata
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    "keyed-topic",
    new KafkaDeserializationSchema<String>() {
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            String key = record.key() != null ? new String(record.key()) : "null";
            String value = record.value() != null ? new String(record.value()) : "null";
            return String.format("Key: %s, Value: %s, Partition: %d, Offset: %d", 
                key, value, record.partition(), record.offset());
        }
        
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    },
    properties
);

Rate Limited Consumption

import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    "high-volume-topic",
    new SimpleStringSchema(),
    properties
);

// Limit consumption to 1000 bytes per second
consumer.setRateLimiter(new GuavaFlinkConnectorRateLimiter(1000.0));

env.addSource(consumer)
   .print();

Advanced Configuration

Properties properties = new Properties();
// Required Kafka settings
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");
properties.setProperty("group.id", "advanced-consumer-group");

// Offset management
properties.setProperty("auto.offset.reset", "earliest"); // or "latest"
properties.setProperty("enable.auto.commit", "false"); // Flink manages commits

// Performance tuning
properties.setProperty("fetch.min.bytes", "1024");
properties.setProperty("fetch.max.wait.ms", "500");
properties.setProperty("max.partition.fetch.bytes", "1048576");

// Custom polling timeout
properties.setProperty("flink.poll-timeout", "100");

// Enable partition discovery for dynamic topics
properties.setProperty("flink.partition-discovery.interval-millis", "30000");

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    "configured-topic",
    new SimpleStringSchema(),
    properties
);

// Configure watermark generation for event time processing
consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);

Error Handling

The FlinkKafkaConsumer09 integrates with Flink's fault tolerance mechanisms:

  • Checkpointing: Automatically saves consumer offsets during checkpoints
  • Recovery: Restores from last successful checkpoint on failure
  • Exactly-once: Guarantees no data loss or duplication when checkpointing is enabled
  • Parallelism: Each parallel instance consumes from assigned partitions independently

Common exceptions:

  • IllegalArgumentException: Invalid configuration or parameters
  • IOException: Network or serialization issues
  • RuntimeException: Kafka client errors or partition assignment failures