Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12@1.11.0Apache Flink Kafka 0.10 connector provides streaming data integration between Apache Flink and Apache Kafka 0.10.x message brokers. It enables both consuming from and producing to Kafka topics with exactly-once processing guarantees, dynamic partition discovery, and comprehensive error handling.
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.11.6</version></dependency>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Create Kafka consumer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-consumer-group");
// Create consumer
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"my-topic",
new SimpleStringSchema(),
properties
);
// Add to Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(consumer)
.print();import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Create Kafka producer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// Create producer
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
"output-topic",
new SimpleStringSchema(),
properties
);
// Add to streaming pipeline
dataStream.addSink(producer);The Flink Kafka 0.10 connector is built around several key components:
FlinkKafkaConsumer010 for reading data from Kafka topicsFlinkKafkaProducer010 for writing data to Kafka topicsCore consumer functionality for reading data from Kafka 0.10.x topics with exactly-once processing guarantees and flexible topic subscription patterns.
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
// Single topic constructors
public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
// Multiple topics constructors
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
// Pattern-based subscription constructors
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
// Rate limiting methods
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
public FlinkConnectorRateLimiter getRateLimiter();
}Core producer functionality for writing data to Kafka 0.10.x topics with exactly-once processing guarantees and custom partitioning support.
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
// Value-only serialization constructors
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
// Key-value serialization constructors
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
// Configuration methods
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
}SQL and Table API integration for declarative stream processing with Kafka sources and sinks, supporting both legacy and dynamic table factories.
// Legacy table factory
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
protected String kafkaVersion();
protected boolean supportsKafkaTimestamps();
}
// Dynamic table factory
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
public static final String IDENTIFIER = "kafka-0.10";
}public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
public static final long DEFAULT_POLL_TIMEOUT = 100L;public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
// Generic type T represents the output data type after deserialization
}public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
// Generic type T represents the input data type before serialization
}// Simple value deserialization
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(byte[] message) throws IOException;
boolean isEndOfStream(T nextElement);
}
// Key-value deserialization
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
boolean isEndOfStream(T nextElement);
}
// Simple value serialization
public interface SerializationSchema<T> extends Serializable {
byte[] serialize(T element);
}
// Key-value serialization
public interface KeyedSerializationSchema<T> extends Serializable {
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element);
}public abstract class FlinkKafkaPartitioner<T> implements Serializable {
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
public void open(int parallelInstanceId, int parallelInstances);
}public interface FlinkConnectorRateLimiter extends Serializable {
void open(RuntimeContext runtimeContext) throws Exception;
void acquire(long bytes);
void close() throws Exception;
}