Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink Kafka 0.10 connector provides streaming data integration between Apache Flink and Apache Kafka 0.10.x message brokers. It enables both consuming from and producing to Kafka topics with exactly-once processing guarantees, dynamic partition discovery, and comprehensive error handling.
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.11.6</version></dependency>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Create Kafka consumer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-consumer-group");
// Create consumer
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"my-topic",
new SimpleStringSchema(),
properties
);
// Add to Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(consumer)
.print();import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Create Kafka producer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// Create producer
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
"output-topic",
new SimpleStringSchema(),
properties
);
// Add to streaming pipeline
dataStream.addSink(producer);The Flink Kafka 0.10 connector is built around several key components:
FlinkKafkaConsumer010 for reading data from Kafka topicsFlinkKafkaProducer010 for writing data to Kafka topicsCore consumer functionality for reading data from Kafka 0.10.x topics with exactly-once processing guarantees and flexible topic subscription patterns.
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
// Single topic constructors
public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
// Multiple topics constructors
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
// Pattern-based subscription constructors
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
// Rate limiting methods
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
public FlinkConnectorRateLimiter getRateLimiter();
}Core producer functionality for writing data to Kafka 0.10.x topics with exactly-once processing guarantees and custom partitioning support.
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
// Value-only serialization constructors
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
// Key-value serialization constructors
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
// Configuration methods
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
}SQL and Table API integration for declarative stream processing with Kafka sources and sinks, supporting both legacy and dynamic table factories.
// Legacy table factory
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
protected String kafkaVersion();
protected boolean supportsKafkaTimestamps();
}
// Dynamic table factory
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
public static final String IDENTIFIER = "kafka-0.10";
}public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
public static final long DEFAULT_POLL_TIMEOUT = 100L;public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
// Generic type T represents the output data type after deserialization
}public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
// Generic type T represents the input data type before serialization
}// Simple value deserialization
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(byte[] message) throws IOException;
boolean isEndOfStream(T nextElement);
}
// Key-value deserialization
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
boolean isEndOfStream(T nextElement);
}
// Simple value serialization
public interface SerializationSchema<T> extends Serializable {
byte[] serialize(T element);
}
// Key-value serialization
public interface KeyedSerializationSchema<T> extends Serializable {
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element);
}public abstract class FlinkKafkaPartitioner<T> implements Serializable {
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
public void open(int parallelInstanceId, int parallelInstances);
}public interface FlinkConnectorRateLimiter extends Serializable {
void open(RuntimeContext runtimeContext) throws Exception;
void acquire(long bytes);
void close() throws Exception;
}