Apache Flink SQL connector for Apache Kafka 0.11 with shaded dependencies providing streaming and table API integration
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11_2-11@1.11.0Apache Flink SQL connector for Apache Kafka 0.11.x that provides both streaming and Table/SQL API integration with comprehensive transaction support and exactly-once semantics. This shaded connector packages all Kafka client dependencies to prevent classpath conflicts in Flink deployments.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka-0.11_2.11</artifactId>
<version>1.11.6</version>
</dependency>For streaming DataStream programs:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;For Table/SQL API integration:
import org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;For serialization and partitioning:
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-group");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"my-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> stream = env.addSource(consumer);import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("transaction.timeout.ms", "900000");
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"output-topic",
new SimpleStringSchema(),
properties,
Semantic.EXACTLY_ONCE
);
stream.addSink(producer);CREATE TABLE kafka_table (
id INT,
name STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);The Flink Kafka 0.11 connector is built around several key components:
FlinkKafkaConsumer011 for reading from Kafka topics with checkpoint integrationFlinkKafkaProducer011 supporting transactional writes and exactly-once semanticsProvides FlinkKafkaConsumer011 for consuming from Kafka topics with support for multiple deserialization patterns, startup modes, and partition discovery.
// Primary constructors for different use cases
FlinkKafkaConsumer011<T>(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer011<T>(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer011<T>(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)Provides FlinkKafkaProducer011 with transactional support, multiple delivery semantics, and flexible partitioning options.
// Core producer constructors
FlinkKafkaProducer011<IN>(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic)
FlinkKafkaProducer011<IN>(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize)
// Delivery semantics
enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}Provides factory classes for creating Kafka table sources and sinks in Flink's Table API and SQL, supporting both legacy and modern dynamic table APIs.
// Dynamic table factory for SQL DDL
class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
String factoryIdentifier() // Returns "kafka-0.11"
}
// Dynamic table source and sink
class Kafka011DynamicSource extends KafkaDynamicSourceBase
class Kafka011DynamicSink extends KafkaDynamicSinkBaseBase interfaces and implementations for converting between Flink data types and Kafka record formats, with access to Kafka metadata.
// Core serialization interfaces
interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception;
}
interface KafkaSerializationSchema<T> extends Serializable {
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}Configuration options, startup modes, and custom partitioning strategies for fine-tuning connector behavior.
// Startup mode options
enum StartupMode {
GROUP_OFFSETS,
EARLIEST,
LATEST,
TIMESTAMP,
SPECIFIC_OFFSETS
}
// Custom partitioner base class
abstract class FlinkKafkaPartitioner<T> implements Serializable {
abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}class FlinkKafka011Exception extends FlinkException {
FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message);
FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause);
FlinkKafka011ErrorCode getErrorCode();
}
enum FlinkKafka011ErrorCode {
PRODUCERS_POOL_EMPTY,
EXTERNAL_ERROR
}