Streaming data source functionality for consuming from Kafka 0.9.x topics with exactly-once processing guarantees, configurable deserialization, automatic offset management, and fault tolerance through Flink's checkpointing mechanism.
Main Kafka consumer class for streaming data from Kafka 0.9.x topics into Flink data streams.
/**
* Kafka consumer for streaming data from Apache Kafka 0.9.x topics.
* Supports exactly-once processing, fault tolerance, and parallel consumption.
*
* @param <T> The type of records consumed from Kafka
*/
@PublicEvolving
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka consumer for a single topic with value-only deserialization.
*
* @param topic The name of the topic to consume from
* @param valueDeserializer The deserializer for converting byte messages to objects
* @param props Kafka consumer properties and configuration
*/
public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
/**
* Creates a new Kafka consumer for a single topic with key/value deserialization.
*
* @param topic The name of the topic to consume from
* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
* @param props Kafka consumer properties and configuration
*/
public FlinkKafkaConsumer09(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
/**
* Creates a new Kafka consumer for multiple topics with value-only deserialization.
*
* @param topics The list of topic names to consume from
* @param deserializer The deserializer for converting byte messages to objects
* @param props Kafka consumer properties and configuration
*/
public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
/**
* Creates a new Kafka consumer for multiple topics with key/value deserialization.
*
* @param topics The list of topic names to consume from
* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
* @param props Kafka consumer properties and configuration
*/
public FlinkKafkaConsumer09(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
/**
* Creates a new Kafka consumer for topics matching a pattern with value-only deserialization.
* Dynamic topic discovery enabled if partition discovery interval is configured.
*
* @param subscriptionPattern Regular expression pattern for topic names to subscribe to
* @param valueDeserializer The deserializer for converting byte messages to objects
* @param props Kafka consumer properties and configuration
*/
@PublicEvolving
public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
/**
* Creates a new Kafka consumer for topics matching a pattern with key/value deserialization.
* Dynamic topic discovery enabled if partition discovery interval is configured.
*
* @param subscriptionPattern Regular expression pattern for topic names to subscribe to
* @param deserializer The keyed deserializer for reading key/value pairs, offsets, and topic names
* @param props Kafka consumer properties and configuration
*/
@PublicEvolving
public FlinkKafkaConsumer09(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
/**
* Sets a rate limiter to throttle bytes read from Kafka.
*
* @param kafkaRateLimiter The rate limiter to control consumption rate
*/
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
/**
* Gets the currently configured rate limiter.
*
* @return The configured rate limiter, or null if none is set
*/
public FlinkConnectorRateLimiter getRateLimiter();
/**
* Assigns watermarks and timestamp extractors using punctuated watermarks.
*
* @param assigner The punctuated watermark assigner
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
/**
* Assigns watermarks and timestamp extractors using periodic watermarks.
*
* @param assigner The periodic watermark assigner
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
/**
* Configures whether to commit consumed offsets back to Kafka on checkpoints.
*
* @param commitOnCheckpoints True to enable offset commits on checkpoints
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
/**
* Configures the consumer to start reading from the earliest available offsets.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> setStartFromEarliest();
/**
* Configures the consumer to start reading from the latest available offsets.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> setStartFromLatest();
/**
* Configures the consumer to start reading from the consumer group's committed offsets.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> setStartFromGroupOffsets();
/**
* Configures the consumer to start reading from specific partition offsets.
*
* @param specificStartupOffsets Map of partitions to their starting offsets
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
/**
* Disables filtering of restored partitions with currently subscribed topics.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumer09<T> disableFilterRestoredPartitionsWithSubscribedTopics();
}/** Configuration key to change the polling timeout */
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
/** Default time in milliseconds spent waiting in poll if data is not available */
public static final long DEFAULT_POLL_TIMEOUT = 100L;
/** The maximum number of pending non-committed checkpoints to track */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
/** The default interval to execute partition discovery (disabled by default) */
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
/** Boolean configuration key to disable metrics tracking */
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/** Configuration key to define the consumer's partition discovery interval, in milliseconds */
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
"my-topic",
new SimpleStringSchema(),
properties
);
env.addSource(consumer)
.print();
env.execute("Basic Kafka Consumer");import java.util.Arrays;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "multi-topic-group");
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
Arrays.asList("topic-1", "topic-2", "topic-3"),
new SimpleStringSchema(),
properties
);
env.addSource(consumer)
.map(value -> "Processed: " + value)
.print();import java.util.regex.Pattern;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "pattern-group");
// Enable dynamic topic discovery
properties.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
Pattern.compile("log-topic-.*"),
new SimpleStringSchema(),
properties
);
env.addSource(consumer)
.filter(value -> value.contains("ERROR"))
.print();import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
// Custom keyed deserializer for accessing keys, values, and metadata
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
"keyed-topic",
new KafkaDeserializationSchema<String>() {
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
String key = record.key() != null ? new String(record.key()) : "null";
String value = record.value() != null ? new String(record.value()) : "null";
return String.format("Key: %s, Value: %s, Partition: %d, Offset: %d",
key, value, record.partition(), record.offset());
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
},
properties
);import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
"high-volume-topic",
new SimpleStringSchema(),
properties
);
// Limit consumption to 1000 bytes per second
consumer.setRateLimiter(new GuavaFlinkConnectorRateLimiter(1000.0));
env.addSource(consumer)
.print();Properties properties = new Properties();
// Required Kafka settings
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");
properties.setProperty("group.id", "advanced-consumer-group");
// Offset management
properties.setProperty("auto.offset.reset", "earliest"); // or "latest"
properties.setProperty("enable.auto.commit", "false"); // Flink manages commits
// Performance tuning
properties.setProperty("fetch.min.bytes", "1024");
properties.setProperty("fetch.max.wait.ms", "500");
properties.setProperty("max.partition.fetch.bytes", "1048576");
// Custom polling timeout
properties.setProperty("flink.poll-timeout", "100");
// Enable partition discovery for dynamic topics
properties.setProperty("flink.partition-discovery.interval-millis", "30000");
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
"configured-topic",
new SimpleStringSchema(),
properties
);
// Configure watermark generation for event time processing
consumer.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);The FlinkKafkaConsumer09 integrates with Flink's fault tolerance mechanisms:
Common exceptions:
IllegalArgumentException: Invalid configuration or parametersIOException: Network or serialization issuesRuntimeException: Kafka client errors or partition assignment failures