0
# Apache Flink Kafka Connector 0.8
1
2
A comprehensive streaming connector for integrating Apache Flink with Apache Kafka 0.8.x, providing both source and sink capabilities with exactly-once processing guarantees for consumers and Table API integration.
3
4
## Package Information
5
6
**Maven Coordinates:**
7
```xml
8
<dependency>
9
<groupId>org.apache.flink</groupId>
10
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
11
<version>1.10.3</version>
12
</dependency>
13
```
14
15
*Note: The artifact ID includes the Scala binary version suffix (e.g., `_2.11` for Scala 2.11, `_2.12` for Scala 2.12)*
16
17
**Java Version:** Java 8+
18
19
**Kafka Compatibility:** Apache Kafka 0.8.x
20
21
**Main Package:** `org.apache.flink.streaming.connectors.kafka`
22
23
## Core Imports
24
25
```java { .api }
26
// Core Consumer Classes
27
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
28
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
29
30
// Core Producer Classes
31
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
32
33
// Serialization
34
import org.apache.flink.api.common.serialization.DeserializationSchema;
35
import org.apache.flink.api.common.serialization.SerializationSchema;
36
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
37
38
// Table API (Internal)
39
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
40
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
41
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;
42
43
// Required Dependencies
44
import java.util.Properties;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.regex.Pattern;
48
```
49
50
## Key Features
51
52
### Consumer Features
53
- **Exactly-once processing guarantees** through Flink's checkpointing mechanism
54
- **Dynamic partition discovery** with pattern-based topic subscription
55
- **Multiple startup modes**: earliest, latest, group offsets, or specific offsets
56
- **Watermark support** for event time processing
57
- **Fault tolerance** with automatic offset recovery
58
- **ZooKeeper integration** for Kafka 0.8 metadata management
59
60
### Producer Features
61
- **Custom partitioning** strategies with FlinkKafkaPartitioner
62
- **Key/Value serialization** support
63
- **Checkpointing integration** with flush capabilities
64
- **Broker list or Properties configuration**
65
- **Note**: Kafka 0.8 producer provides no reliability guarantees
66
67
### Table API Integration
68
- **Schema evolution** support with field mapping
69
- **Processing time and rowtime attributes**
70
- **Configurable startup modes and offset management**
71
- **Factory-based configuration** for SQL/Table API
72
73
## Quick Start
74
75
### Basic Consumer Example
76
77
```java { .api }
78
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
79
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
80
import org.apache.flink.api.common.serialization.SimpleStringSchema;
81
82
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
83
84
Properties properties = new Properties();
85
properties.setProperty("zookeeper.connect", "localhost:2181");
86
properties.setProperty("group.id", "my-consumer-group");
87
88
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
89
"my-topic",
90
new SimpleStringSchema(),
91
properties
92
);
93
94
consumer.setStartFromEarliest();
95
env.addSource(consumer).print();
96
env.execute("Kafka Consumer Job");
97
```
98
99
### Basic Producer Example
100
101
```java { .api }
102
import org.apache.flink.streaming.api.datastream.DataStream;
103
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
104
import org.apache.flink.api.common.serialization.SimpleStringSchema;
105
106
Properties properties = new Properties();
107
properties.setProperty("metadata.broker.list", "localhost:9092");
108
109
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
110
"my-topic",
111
new SimpleStringSchema(),
112
properties
113
);
114
115
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
116
stream.addSink(producer);
117
```
118
119
## Architecture
120
121
### Consumer Architecture
122
123
The Flink Kafka Consumer 0.8 is built on a multi-layered architecture:
124
125
1. **FlinkKafkaConsumer08** - Main consumer class extending FlinkKafkaConsumerBase
126
2. **AbstractFetcher** - Handles partition fetching and watermark generation
127
3. **AbstractPartitionDiscoverer** - Manages partition discovery and metadata
128
4. **KafkaDeserializationSchema** - Converts Kafka ConsumerRecords to Flink data types
129
130
```java { .api }
131
// Core consumer setup with custom deserialization
132
KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
133
@Override
134
public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
135
// Custom deserialization logic
136
return MyEvent.fromBytes(record.value());
137
}
138
139
@Override
140
public boolean isEndOfStream(MyEvent nextElement) {
141
return false; // Never end stream
142
}
143
144
@Override
145
public TypeInformation<MyEvent> getProducedType() {
146
return TypeInformation.of(MyEvent.class);
147
}
148
};
149
```
150
151
### Producer Architecture
152
153
The producer architecture focuses on reliable message delivery within Kafka 0.8 constraints:
154
155
1. **FlinkKafkaProducer08** - Main producer extending FlinkKafkaProducerBase
156
2. **FlinkKafkaPartitioner** - Custom partitioning logic (optional)
157
3. **SerializationSchema/KeyedSerializationSchema** - Message serialization
158
159
```java { .api }
160
// Custom partitioner example
161
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
162
@Override
163
public int partition(MyEvent record, byte[] key, byte[] value,
164
String targetTopic, int[] partitions) {
165
// Custom partitioning logic based on record content
166
return Math.abs(record.getCustomerId().hashCode() % partitions.length);
167
}
168
};
169
```
170
171
## Configuration
172
173
### Required Kafka 0.8 Properties
174
175
```java { .api }
176
Properties kafkaProps = new Properties();
177
178
// Required for Consumer
179
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
180
kafkaProps.setProperty("group.id", "my-group");
181
182
// Required for Producer
183
kafkaProps.setProperty("metadata.broker.list", "localhost:9092");
184
185
// Optional Consumer Properties
186
kafkaProps.setProperty("auto.offset.reset", "earliest"); // or "latest"
187
kafkaProps.setProperty("fetch.message.max.bytes", "1048576");
188
kafkaProps.setProperty("socket.timeout.ms", "30000");
189
kafkaProps.setProperty("auto.commit.enable", "false"); // Recommended for exactly-once
190
191
// Flink-specific Properties
192
kafkaProps.setProperty("flink.partition-discovery.interval-millis", "30000");
193
kafkaProps.setProperty("flink.disable-metrics", "false");
194
```
195
196
## Startup Mode Configuration
197
198
```java { .api }
199
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
200
"my-topic", new SimpleStringSchema(), properties);
201
202
// Start from earliest available messages
203
consumer.setStartFromEarliest();
204
205
// Start from latest messages (skip existing)
206
consumer.setStartFromLatest();
207
208
// Start from consumer group's committed offsets (default)
209
consumer.setStartFromGroupOffsets();
210
211
// Start from specific offsets per partition
212
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
213
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
214
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
215
consumer.setStartFromSpecificOffsets(specificOffsets);
216
```
217
218
## Watermark and Timestamp Assignment
219
220
```java { .api }
221
import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
222
import org.apache.flink.streaming.api.watermark.Watermark;
223
224
FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
225
"my-topic", myDeserializer, properties);
226
227
// Assign periodic watermarks
228
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyEvent>() {
229
private long currentMaxTimestamp = Long.MIN_VALUE;
230
231
@Override
232
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
233
long timestamp = element.getTimestamp();
234
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
235
return timestamp;
236
}
237
238
@Override
239
public Watermark getCurrentWatermark() {
240
return new Watermark(currentMaxTimestamp - 5000); // 5 second tolerance
241
}
242
});
243
```
244
245
## Error Handling and Reliability
246
247
```java { .api }
248
// Consumer reliability configuration
249
consumer.setCommitOffsetsOnCheckpoints(true); // Enable exactly-once
250
251
// Producer error handling
252
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
253
"my-topic", new SimpleStringSchema(), properties);
254
255
producer.setLogFailuresOnly(false); // Fail on errors (default)
256
producer.setFlushOnCheckpoint(true); // Flush data on checkpoint
257
```
258
259
## API Documentation
260
261
For detailed API documentation of specific components:
262
263
- **[Kafka Consumer API](kafka-consumer.md)** - Complete FlinkKafkaConsumer08 API reference
264
- **[Kafka Producer API](kafka-producer.md)** - Complete FlinkKafkaProducer08 API reference
265
- **[Table API Integration](table-api.md)** - Kafka08TableSource, Kafka08TableSink, and factory classes (*Note: These are @Internal APIs*)
266
267
## Version Notes
268
269
### Kafka 0.8 Limitations
270
- **No transactional support**: Producers cannot provide exactly-once guarantees
271
- **ZooKeeper dependency**: Consumers require ZooKeeper for metadata operations
272
- **No timestamp support**: Cannot fetch offsets by timestamp
273
- **Limited reliability**: Producers may lose messages on failures
274
275
### Deprecated Classes
276
- `FlinkKafkaConsumer081` - Use `FlinkKafkaConsumer08` instead
277
- `FlinkKafkaConsumer082` - Use `FlinkKafkaConsumer08` instead
278
- `FlinkKafkaProducer` - Use `FlinkKafkaProducer08` instead
279
280
### Internal APIs
281
- **Table API classes** (Kafka08TableSource, Kafka08TableSink, Kafka08TableSourceSinkFactory) are marked `@Internal`
282
- These may change without notice and are not part of the official public API
283
- Use through Flink's Table API framework rather than directly
284
285
### Migration Path
286
When upgrading from Kafka 0.8 to newer versions, consider:
287
1. Replacing with `flink-connector-kafka-0.9+` for better reliability guarantees
288
2. Updating ZooKeeper-based configuration to bootstrap servers
289
3. Migrating to transactional producers for exactly-once semantics