0
# Apache Flink Kafka 0.11 SQL Connector
1
2
Apache Flink SQL connector for Apache Kafka 0.11.x that provides both streaming and Table/SQL API integration with comprehensive transaction support and exactly-once semantics. This shaded connector packages all Kafka client dependencies to prevent classpath conflicts in Flink deployments.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-connector-kafka-0.11_2.11
7
- **Package Type**: maven
8
- **Language**: Java/Scala
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-sql-connector-kafka-0.11_2.11
11
- **Version**: 1.11.6
12
- **Installation**:
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-sql-connector-kafka-0.11_2.11</artifactId>
17
<version>1.11.6</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
For streaming DataStream programs:
24
25
```java
26
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
27
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
28
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
29
```
30
31
For Table/SQL API integration:
32
33
```java
34
import org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory;
35
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSource;
36
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;
37
```
38
39
For serialization and partitioning:
40
41
```java
42
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
43
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
44
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
45
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
46
```
47
48
## Basic Usage
49
50
### Streaming Consumer
51
52
```java
53
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
54
import org.apache.flink.api.common.serialization.SimpleStringSchema;
55
56
Properties properties = new Properties();
57
properties.setProperty("bootstrap.servers", "localhost:9092");
58
properties.setProperty("group.id", "my-group");
59
60
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
61
"my-topic",
62
new SimpleStringSchema(),
63
properties
64
);
65
66
DataStream<String> stream = env.addSource(consumer);
67
```
68
69
### Streaming Producer with Exactly-Once
70
71
```java
72
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
73
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
74
75
Properties properties = new Properties();
76
properties.setProperty("bootstrap.servers", "localhost:9092");
77
properties.setProperty("transaction.timeout.ms", "900000");
78
79
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
80
"output-topic",
81
new SimpleStringSchema(),
82
properties,
83
Semantic.EXACTLY_ONCE
84
);
85
86
stream.addSink(producer);
87
```
88
89
### SQL Table Definition
90
91
```sql
92
CREATE TABLE kafka_table (
93
id INT,
94
name STRING,
95
timestamp_col TIMESTAMP(3)
96
) WITH (
97
'connector' = 'kafka-0.11',
98
'topic' = 'my-topic',
99
'properties.bootstrap.servers' = 'localhost:9092',
100
'properties.group.id' = 'my-group',
101
'scan.startup.mode' = 'earliest-offset',
102
'format' = 'json'
103
);
104
```
105
106
## Architecture
107
108
The Flink Kafka 0.11 connector is built around several key components:
109
110
- **Streaming Consumer**: `FlinkKafkaConsumer011` for reading from Kafka topics with checkpoint integration
111
- **Streaming Producer**: `FlinkKafkaProducer011` supporting transactional writes and exactly-once semantics
112
- **Table API Integration**: Dynamic table factories for SQL DDL support
113
- **Shaded Dependencies**: All Kafka client libraries relocated to prevent conflicts
114
- **Transaction Support**: Two-phase commit protocol for exactly-once guarantees
115
- **Configuration System**: Extensive Properties-based configuration with Flink-specific extensions
116
117
## Capabilities
118
119
### Streaming Consumer API
120
121
Provides `FlinkKafkaConsumer011` for consuming from Kafka topics with support for multiple deserialization patterns, startup modes, and partition discovery.
122
123
```java { .api }
124
// Primary constructors for different use cases
125
FlinkKafkaConsumer011<T>(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
126
FlinkKafkaConsumer011<T>(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
127
FlinkKafkaConsumer011<T>(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
128
```
129
130
[Streaming Consumer](./streaming-consumer.md)
131
132
### Streaming Producer API
133
134
Provides `FlinkKafkaProducer011` with transactional support, multiple delivery semantics, and flexible partitioning options.
135
136
```java { .api }
137
// Core producer constructors
138
FlinkKafkaProducer011<IN>(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic)
139
FlinkKafkaProducer011<IN>(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize)
140
141
// Delivery semantics
142
enum Semantic {
143
EXACTLY_ONCE,
144
AT_LEAST_ONCE,
145
NONE
146
}
147
```
148
149
[Streaming Producer](./streaming-producer.md)
150
151
### Table/SQL API Integration
152
153
Provides factory classes for creating Kafka table sources and sinks in Flink's Table API and SQL, supporting both legacy and modern dynamic table APIs.
154
155
```java { .api }
156
// Dynamic table factory for SQL DDL
157
class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
158
String factoryIdentifier() // Returns "kafka-0.11"
159
}
160
161
// Dynamic table source and sink
162
class Kafka011DynamicSource extends KafkaDynamicSourceBase
163
class Kafka011DynamicSink extends KafkaDynamicSinkBase
164
```
165
166
[Table API](./table-api.md)
167
168
### Serialization and Deserialization
169
170
Base interfaces and implementations for converting between Flink data types and Kafka record formats, with access to Kafka metadata.
171
172
```java { .api }
173
// Core serialization interfaces
174
interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
175
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
176
void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception;
177
}
178
179
interface KafkaSerializationSchema<T> extends Serializable {
180
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
181
}
182
```
183
184
[Serialization](./serialization.md)
185
186
### Configuration and Partitioning
187
188
Configuration options, startup modes, and custom partitioning strategies for fine-tuning connector behavior.
189
190
```java { .api }
191
// Startup mode options
192
enum StartupMode {
193
GROUP_OFFSETS,
194
EARLIEST,
195
LATEST,
196
TIMESTAMP,
197
SPECIFIC_OFFSETS
198
}
199
200
// Custom partitioner base class
201
abstract class FlinkKafkaPartitioner<T> implements Serializable {
202
abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
203
}
204
```
205
206
[Configuration](./configuration.md)
207
208
## Types
209
210
### Core Exception Types
211
212
```java { .api }
213
class FlinkKafka011Exception extends FlinkException {
214
FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message);
215
FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause);
216
FlinkKafka011ErrorCode getErrorCode();
217
}
218
219
enum FlinkKafka011ErrorCode {
220
PRODUCERS_POOL_EMPTY,
221
EXTERNAL_ERROR
222
}
223
```