Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees
—
The FlinkKafkaConsumer010 provides comprehensive functionality for consuming data from Apache Kafka 0.10.x topics with exactly-once processing guarantees, flexible subscription patterns, and advanced features like rate limiting and dynamic partition discovery.
Creates a consumer for a single Kafka topic with value-only deserialization.
/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x
* @param topic The name of the topic that should be consumed
* @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
* @param props The properties used to configure the Kafka consumer client
*/
public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x with key-value deserialization
* @param topic The name of the topic that should be consumed
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
* @param props The properties used to configure the Kafka consumer client
*/
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);Usage Examples:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
// Simple string deserialization
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"events-topic",
new SimpleStringSchema(),
props
);
// Custom key-value deserialization
FlinkKafkaConsumer010<MyEvent> eventConsumer = new FlinkKafkaConsumer010<>(
"events-topic",
new KafkaDeserializationSchema<MyEvent>() {
@Override
public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return MyEvent.fromJson(new String(record.value()));
}
@Override
public boolean isEndOfStream(MyEvent nextElement) {
return false;
}
@Override
public TypeInformation<MyEvent> getProducedType() {
return TypeInformation.of(MyEvent.class);
}
},
props
);Creates a consumer for multiple Kafka topics specified as a list.
/**
* Creates a new Kafka streaming source consumer for multiple topics
* @param topics The Kafka topics to read from
* @param deserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
* @param props The properties that are used to configure both the fetcher and the offset handler
*/
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
/**
* Creates a new Kafka streaming source consumer for multiple topics with key-value deserialization
* @param topics The Kafka topics to read from
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
* @param props The properties that are used to configure both the fetcher and the offset handler
*/
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);Usage Examples:
import java.util.Arrays;
import java.util.List;
// Multiple topics with simple deserialization
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
FlinkKafkaConsumer010<String> multiTopicConsumer = new FlinkKafkaConsumer010<>(
topics,
new SimpleStringSchema(),
props
);
// Multiple topics with JSON deserialization
FlinkKafkaConsumer010<JsonNode> jsonConsumer = new FlinkKafkaConsumer010<>(
topics,
new JsonDeserializationSchema(),
props
);Creates a consumer that subscribes to topics matching a regular expression pattern, with automatic discovery of new matching topics.
/**
* Creates a new Kafka streaming source consumer using pattern-based topic subscription
* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to
* @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
* @param props The properties used to configure the Kafka consumer client
*/
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
/**
* Creates a new Kafka streaming source consumer using pattern-based topic subscription with key-value deserialization
* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
* @param props The properties used to configure the Kafka consumer client
*/
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);Usage Examples:
import java.util.regex.Pattern;
// Subscribe to all topics starting with "logs-"
Pattern logTopicsPattern = Pattern.compile("logs-.*");
FlinkKafkaConsumer010<String> patternConsumer = new FlinkKafkaConsumer010<>(
logTopicsPattern,
new SimpleStringSchema(),
props
);
// Subscribe to topics matching environment-specific pattern
Pattern envPattern = Pattern.compile("(prod|staging)-events-.*");
FlinkKafkaConsumer010<String> envConsumer = new FlinkKafkaConsumer010<>(
envPattern,
new SimpleStringSchema(),
props
);Configure rate limiting to throttle the number of bytes read from Kafka per second.
/**
* Set rate limiter for throttling bytes read from Kafka
* @param kafkaRateLimiter Rate limiter implementation to control consumption rate
*/
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
/**
* Get the configured rate limiter
* @return The currently configured rate limiter, or null if none is set
*/
public FlinkConnectorRateLimiter getRateLimiter();Usage Examples:
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
// Create consumer
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"high-volume-topic",
new SimpleStringSchema(),
props
);
// Set rate limiter to 1MB per second
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter(1024 * 1024); // 1MB/s
consumer.setRateLimiter(rateLimiter);
// Check current rate limiter
FlinkConnectorRateLimiter currentLimiter = consumer.getRateLimiter();// Configuration key for polling timeout
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
// Default polling timeout in milliseconds
public static final long DEFAULT_POLL_TIMEOUT = 100L;Required properties:
Optional properties:
Enable automatic discovery of new partitions by setting partition discovery interval:
// Enable partition discovery every 30 seconds
props.setProperty("flink.partition-discovery.interval-millis", "30000");The FlinkKafkaConsumer010 integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees:
// Configure for exactly-once processing
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
// In streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 secondsThe consumer handles various error scenarios:
The consumer supports event time processing with watermark extraction:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
FlinkKafkaConsumer010<MyEvent> consumer = new FlinkKafkaConsumer010<>(...);
// Assign watermarks and timestamps
consumer.assignTimestampsAndWatermarks(
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12