Apache Kafka 0.8.x connector for Apache Flink streaming data processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-10@1.3.0The Apache Flink Kafka 0.8 connector enables high-performance streaming integration between Apache Flink and Apache Kafka 0.8.x clusters. It provides exactly-once processing guarantees through checkpointing, supports parallel consumption and production, and offers comprehensive offset management with ZooKeeper integration.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
// Set up Flink environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure Kafka consumer properties
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("zookeeper.connect", "localhost:2181");
consumerProps.setProperty("group.id", "flink-consumer");
// Create consumer
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"input-topic",
new SimpleStringSchema(),
consumerProps
);
// Add consumer as source
DataStream<String> stream = env.addSource(consumer);
// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
// Create producer and add as sink
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"output-topic",
new SimpleStringSchema(),
producerProps
);
stream.addSink(producer);
// Execute
env.execute("Kafka Streaming Job");The Flink Kafka 0.8 connector is built around several key components:
FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase and uses Kafka08Fetcher for message retrieval with ZooKeeper-based offset managementFlinkKafkaProducer08 extends FlinkKafkaProducerBase for message publishing with configurable partitioningPrimary consumer for reading from Kafka 0.8.x topics with exactly-once processing guarantees and checkpointing support.
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
}Producer for writing to Kafka 0.8.x topics with configurable partitioning and serialization support.
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
}Table sources and sinks for integrating Kafka with Flink's SQL API, supporting JSON and Avro formats.
public class Kafka08TableSource extends KafkaTableSource {
public Kafka08TableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo);
}
public class Kafka08JsonTableSource extends KafkaJsonTableSource {
public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
}
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);
}ZooKeeper-based offset management utilities for handling consumer offset storage and retrieval.
public class ZookeeperOffsetHandler {
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset);
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition);
}The connector throws various exceptions that should be handled:
Proper error handling should include retry logic for transient failures and graceful degradation for persistent issues.
Key Kafka properties for consumer configuration:
bootstrap.servers: Kafka broker addresseszookeeper.connect: ZooKeeper connection stringgroup.id: Consumer group identifierauto.offset.reset: Offset reset strategyKey Kafka properties for producer configuration:
bootstrap.servers: Kafka broker addresseskey.serializer: Key serialization classvalue.serializer: Value serialization class/**
* Interface for deserializing Kafka message values only
*/
public interface DeserializationSchema<T> {
T deserialize(byte[] message) throws IOException;
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}
/**
* Interface for deserializing Kafka messages with key, value, topic, partition, and offset
*/
public interface KeyedDeserializationSchema<T> {
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}
/**
* Interface for serializing objects to Kafka message values only
*/
public interface SerializationSchema<T> {
byte[] serialize(T element);
}
/**
* Interface for serializing objects to Kafka messages with keys and values
*/
public interface KeyedSerializationSchema<T> {
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element);
}
/**
* Interface for custom partitioning logic
*/
public interface FlinkKafkaPartitioner<T> {
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}