Apache Flink Kafka Connector for streaming data integration with Kafka 0.9.x versions
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-9_2-12@1.10.0Apache Flink Kafka Connector 0.9 provides streaming data integration between Apache Flink and Kafka 0.9.x message brokers. The connector enables real-time data processing pipelines with exactly-once processing guarantees, fault tolerance, and high-throughput capabilities for both consuming from and producing to Kafka topics.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.12</artifactId>
<version>1.10.3</version>
</dependency>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure Kafka properties
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "my-consumer-group");
// Create Kafka consumer
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
"my-input-topic",
new SimpleStringSchema(),
kafkaProps
);
// Create Kafka producer
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"my-output-topic",
new SimpleStringSchema(),
kafkaProps
);
// Build streaming pipeline
env.addSource(consumer)
.map(value -> value.toUpperCase())
.addSink(producer);
// Execute the job
env.execute("Kafka Streaming Job");Apache Flink Kafka Connector 0.9 is built around several key components:
FlinkKafkaConsumer09 for reading data from Kafka topics with configurable parallelism and offset managementFlinkKafkaProducer09 for writing data to Kafka topics with partitioning and serialization controlStreaming data source functionality for consuming from Kafka 0.9.x topics with configurable deserialization, offset management, and fault tolerance.
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
public FlinkConnectorRateLimiter getRateLimiter();
}Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees.
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
}Table API and SQL integration for declarative stream processing with Kafka sources and sinks through factory-based configuration.
public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
protected String kafkaVersion();
protected boolean supportsKafkaTimestamps();
protected KafkaTableSourceBase createKafkaTableSource(...);
protected KafkaTableSinkBase createKafkaTableSink(...);
}// Kafka consumer configuration key constants
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
public static final long DEFAULT_POLL_TIMEOUT = 100L;
// Deserialization interfaces for data conversion
interface DeserializationSchema<T> {
T deserialize(byte[] message) throws IOException;
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}
interface KafkaDeserializationSchema<T> {
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}
// Serialization interfaces for data conversion
interface SerializationSchema<T> {
byte[] serialize(T element);
}
interface KeyedSerializationSchema<T> {
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element);
}
// Partitioning interface for custom distribution logic
interface FlinkKafkaPartitioner<T> extends Serializable {
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
// Rate limiting interface for consumption throttling
interface FlinkConnectorRateLimiter {
void open(RuntimeContext runtimeContext) throws Exception;
void acquire(long permits);
void close() throws Exception;
}
// Kafka topic partition representation
class KafkaTopicPartition implements Comparable<KafkaTopicPartition>, Serializable {
public KafkaTopicPartition(String topic, int partition);
public String getTopic();
public int getPartition();
public String toString();
public boolean equals(Object o);
public int hashCode();
public int compareTo(KafkaTopicPartition other);
}
// Startup mode enumeration for consumers
enum StartupMode {
EARLIEST,
LATEST,
GROUP_OFFSETS,
SPECIFIC_OFFSETS,
TIMESTAMP
}
// Watermark assignment interfaces for time-based processing
interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
Watermark getCurrentWatermark();
}
interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}