Apache Flink connector for integrating with Apache Kafka 0.8.x message broker systems
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-11@1.10.0A comprehensive streaming connector for integrating Apache Flink with Apache Kafka 0.8.x, providing both source and sink capabilities with exactly-once processing guarantees for consumers and Table API integration.
Maven Coordinates:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.10.3</version>
</dependency>Note: The artifact ID includes the Scala binary version suffix (e.g., _2.11 for Scala 2.11, _2.12 for Scala 2.12)
Java Version: Java 8+
Kafka Compatibility: Apache Kafka 0.8.x
Main Package: org.apache.flink.streaming.connectors.kafka
// Core Consumer Classes
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
// Core Producer Classes
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
// Serialization
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
// Table API (Internal)
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;
// Required Dependencies
import java.util.Properties;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
properties
);
consumer.setStartFromEarliest();
env.addSource(consumer).print();
env.execute("Kafka Consumer Job");import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
Properties properties = new Properties();
properties.setProperty("metadata.broker.list", "localhost:9092");
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"my-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);The Flink Kafka Consumer 0.8 is built on a multi-layered architecture:
// Core consumer setup with custom deserialization
KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
@Override
public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
// Custom deserialization logic
return MyEvent.fromBytes(record.value());
}
@Override
public boolean isEndOfStream(MyEvent nextElement) {
return false; // Never end stream
}
@Override
public TypeInformation<MyEvent> getProducedType() {
return TypeInformation.of(MyEvent.class);
}
};The producer architecture focuses on reliable message delivery within Kafka 0.8 constraints:
// Custom partitioner example
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
@Override
public int partition(MyEvent record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
// Custom partitioning logic based on record content
return Math.abs(record.getCustomerId().hashCode() % partitions.length);
}
};Properties kafkaProps = new Properties();
// Required for Consumer
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("group.id", "my-group");
// Required for Producer
kafkaProps.setProperty("metadata.broker.list", "localhost:9092");
// Optional Consumer Properties
kafkaProps.setProperty("auto.offset.reset", "earliest"); // or "latest"
kafkaProps.setProperty("fetch.message.max.bytes", "1048576");
kafkaProps.setProperty("socket.timeout.ms", "30000");
kafkaProps.setProperty("auto.commit.enable", "false"); // Recommended for exactly-once
// Flink-specific Properties
kafkaProps.setProperty("flink.partition-discovery.interval-millis", "30000");
kafkaProps.setProperty("flink.disable-metrics", "false");FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic", new SimpleStringSchema(), properties);
// Start from earliest available messages
consumer.setStartFromEarliest();
// Start from latest messages (skip existing)
consumer.setStartFromLatest();
// Start from consumer group's committed offsets (default)
consumer.setStartFromGroupOffsets();
// Start from specific offsets per partition
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
consumer.setStartFromSpecificOffsets(specificOffsets);import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
"my-topic", myDeserializer, properties);
// Assign periodic watermarks
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyEvent>() {
private long currentMaxTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - 5000); // 5 second tolerance
}
});// Consumer reliability configuration
consumer.setCommitOffsetsOnCheckpoints(true); // Enable exactly-once
// Producer error handling
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"my-topic", new SimpleStringSchema(), properties);
producer.setLogFailuresOnly(false); // Fail on errors (default)
producer.setFlushOnCheckpoint(true); // Flush data on checkpointFor detailed API documentation of specific components:
FlinkKafkaConsumer081 - Use FlinkKafkaConsumer08 insteadFlinkKafkaConsumer082 - Use FlinkKafkaConsumer08 insteadFlinkKafkaProducer - Use FlinkKafkaProducer08 instead@InternalWhen upgrading from Kafka 0.8 to newer versions, consider:
flink-connector-kafka-0.9+ for better reliability guarantees