0
# Apache Flink Kafka 0.10 Connector
1
2
Apache Flink Kafka 0.10 connector provides streaming data integration between Apache Flink and Apache Kafka 0.10.x message brokers. It enables both consuming from and producing to Kafka topics with exactly-once processing guarantees, dynamic partition discovery, and comprehensive error handling.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-kafka-0.10_2.12
7
- **Package Type**: maven
8
- **Group ID**: org.apache.flink
9
- **Artifact ID**: flink-connector-kafka-0.10_2.12
10
- **Language**: Java
11
- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.11.6</version></dependency>`
12
13
## Core Imports
14
15
```java
16
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
17
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
18
import org.apache.flink.api.common.serialization.DeserializationSchema;
19
import org.apache.flink.api.common.serialization.SerializationSchema;
20
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
21
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
22
```
23
24
## Basic Usage
25
26
### Consumer Example
27
28
```java
29
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
30
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
31
import org.apache.flink.api.common.serialization.SimpleStringSchema;
32
33
import java.util.Properties;
34
35
// Create Kafka consumer properties
36
Properties properties = new Properties();
37
properties.setProperty("bootstrap.servers", "localhost:9092");
38
properties.setProperty("group.id", "test-consumer-group");
39
40
// Create consumer
41
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
42
"my-topic",
43
new SimpleStringSchema(),
44
properties
45
);
46
47
// Add to Flink streaming environment
48
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
49
env.addSource(consumer)
50
.print();
51
```
52
53
### Producer Example
54
55
```java
56
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
57
import org.apache.flink.api.common.serialization.SimpleStringSchema;
58
59
import java.util.Properties;
60
61
// Create Kafka producer properties
62
Properties properties = new Properties();
63
properties.setProperty("bootstrap.servers", "localhost:9092");
64
65
// Create producer
66
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
67
"output-topic",
68
new SimpleStringSchema(),
69
properties
70
);
71
72
// Add to streaming pipeline
73
dataStream.addSink(producer);
74
```
75
76
## Architecture
77
78
The Flink Kafka 0.10 connector is built around several key components:
79
80
- **Consumer Classes**: `FlinkKafkaConsumer010` for reading data from Kafka topics
81
- **Producer Classes**: `FlinkKafkaProducer010` for writing data to Kafka topics
82
- **Table API Integration**: Legacy and dynamic table factory support for SQL/Table API
83
- **Internal Components**: Fetcher, partition discoverer, and consumer thread management
84
- **Serialization Support**: Both simple value serialization and key-value serialization schemas
85
86
## Capabilities
87
88
### Data Stream Consumer
89
90
Core consumer functionality for reading data from Kafka 0.10.x topics with exactly-once processing guarantees and flexible topic subscription patterns.
91
92
```java { .api }
93
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
94
// Single topic constructors
95
public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
96
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
97
98
// Multiple topics constructors
99
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
100
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
101
102
// Pattern-based subscription constructors
103
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
104
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
105
106
// Rate limiting methods
107
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
108
public FlinkConnectorRateLimiter getRateLimiter();
109
}
110
```
111
112
[Data Stream Consumer](./consumer.md)
113
114
### Data Stream Producer
115
116
Core producer functionality for writing data to Kafka 0.10.x topics with exactly-once processing guarantees and custom partitioning support.
117
118
```java { .api }
119
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
120
// Value-only serialization constructors
121
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);
122
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);
123
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
124
125
// Key-value serialization constructors
126
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);
127
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);
128
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
129
130
// Configuration methods
131
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
132
}
133
```
134
135
[Data Stream Producer](./producer.md)
136
137
### Table API Integration
138
139
SQL and Table API integration for declarative stream processing with Kafka sources and sinks, supporting both legacy and dynamic table factories.
140
141
```java { .api }
142
// Legacy table factory
143
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
144
protected String kafkaVersion();
145
protected boolean supportsKafkaTimestamps();
146
}
147
148
// Dynamic table factory
149
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
150
public static final String IDENTIFIER = "kafka-0.10";
151
}
152
```
153
154
[Table API Integration](./table-api.md)
155
156
## Configuration Properties
157
158
### Consumer Configuration Constants
159
160
```java { .api }
161
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
162
public static final long DEFAULT_POLL_TIMEOUT = 100L;
163
```
164
165
### Common Configuration Properties
166
167
- **bootstrap.servers**: Kafka broker addresses (required)
168
- **group.id**: Consumer group identifier
169
- **flink.poll-timeout**: Consumer polling timeout in milliseconds (default: 100)
170
- **enable.auto.commit**: Automatic offset commit (managed by Flink)
171
- **auto.offset.reset**: Initial offset behavior when no committed offset exists
172
173
## Types
174
175
### Core Consumer Type
176
177
```java { .api }
178
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
179
// Generic type T represents the output data type after deserialization
180
}
181
```
182
183
### Core Producer Type
184
185
```java { .api }
186
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
187
// Generic type T represents the input data type before serialization
188
}
189
```
190
191
### Serialization Interfaces
192
193
```java { .api }
194
// Simple value deserialization
195
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
196
T deserialize(byte[] message) throws IOException;
197
boolean isEndOfStream(T nextElement);
198
}
199
200
// Key-value deserialization
201
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
202
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
203
boolean isEndOfStream(T nextElement);
204
}
205
206
// Simple value serialization
207
public interface SerializationSchema<T> extends Serializable {
208
byte[] serialize(T element);
209
}
210
211
// Key-value serialization
212
public interface KeyedSerializationSchema<T> extends Serializable {
213
byte[] serializeKey(T element);
214
byte[] serializeValue(T element);
215
String getTargetTopic(T element);
216
}
217
```
218
219
### Partitioning Interface
220
221
```java { .api }
222
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
223
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
224
public void open(int parallelInstanceId, int parallelInstances);
225
}
226
```
227
228
### Rate Limiting Interface
229
230
```java { .api }
231
public interface FlinkConnectorRateLimiter extends Serializable {
232
void open(RuntimeContext runtimeContext) throws Exception;
233
void acquire(long bytes);
234
void close() throws Exception;
235
}
236
```