0
# Apache Flink Kafka 0.8 Connector
1
2
The Apache Flink Kafka 0.8 connector enables high-performance streaming integration between Apache Flink and Apache Kafka 0.8.x clusters. It provides exactly-once processing guarantees through checkpointing, supports parallel consumption and production, and offers comprehensive offset management with ZooKeeper integration.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-kafka-0.8_2.10
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-connector-kafka-0.8_2.10
11
- **Installation**: Include as Maven dependency
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
17
<version>1.3.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
25
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
26
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
27
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
28
import org.apache.flink.streaming.util.serialization.SerializationSchema;
29
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
36
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
37
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
38
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
39
40
import java.util.Properties;
41
42
// Set up Flink environment
43
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
44
45
// Configure Kafka consumer properties
46
Properties consumerProps = new Properties();
47
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
48
consumerProps.setProperty("zookeeper.connect", "localhost:2181");
49
consumerProps.setProperty("group.id", "flink-consumer");
50
51
// Create consumer
52
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
53
"input-topic",
54
new SimpleStringSchema(),
55
consumerProps
56
);
57
58
// Add consumer as source
59
DataStream<String> stream = env.addSource(consumer);
60
61
// Configure producer properties
62
Properties producerProps = new Properties();
63
producerProps.setProperty("bootstrap.servers", "localhost:9092");
64
65
// Create producer and add as sink
66
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
67
"output-topic",
68
new SimpleStringSchema(),
69
producerProps
70
);
71
72
stream.addSink(producer);
73
74
// Execute
75
env.execute("Kafka Streaming Job");
76
```
77
78
## Architecture
79
80
The Flink Kafka 0.8 connector is built around several key components:
81
82
- **Consumer Architecture**: `FlinkKafkaConsumer08` extends `FlinkKafkaConsumerBase` and uses `Kafka08Fetcher` for message retrieval with ZooKeeper-based offset management
83
- **Producer Architecture**: `FlinkKafkaProducer08` extends `FlinkKafkaProducerBase` for message publishing with configurable partitioning
84
- **Checkpointing Integration**: Seamless integration with Flink's distributed snapshots for exactly-once processing guarantees
85
- **Table API Integration**: Table sources and sinks for SQL API usage with JSON and Avro format support
86
- **Offset Management**: ZooKeeper-based offset storage with periodic commits and recovery support
87
88
## Capabilities
89
90
### Kafka Consumer
91
92
Primary consumer for reading from Kafka 0.8.x topics with exactly-once processing guarantees and checkpointing support.
93
94
```java { .api }
95
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
96
public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
97
public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
98
public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
99
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
100
}
101
```
102
103
[Kafka Consumer](./kafka-consumer.md)
104
105
### Kafka Producer
106
107
Producer for writing to Kafka 0.8.x topics with configurable partitioning and serialization support.
108
109
```java { .api }
110
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
111
public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
112
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
113
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
114
}
115
```
116
117
[Kafka Producer](./kafka-producer.md)
118
119
### Table API Integration
120
121
Table sources and sinks for integrating Kafka with Flink's SQL API, supporting JSON and Avro formats.
122
123
```java { .api }
124
public class Kafka08TableSource extends KafkaTableSource {
125
public Kafka08TableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo);
126
}
127
128
public class Kafka08JsonTableSource extends KafkaJsonTableSource {
129
public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
130
}
131
132
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
133
public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);
134
}
135
```
136
137
[Table API Integration](./table-api.md)
138
139
### Offset Management
140
141
ZooKeeper-based offset management utilities for handling consumer offset storage and retrieval.
142
143
```java { .api }
144
public class ZookeeperOffsetHandler {
145
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset);
146
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition);
147
}
148
```
149
150
[Offset Management](./offset-management.md)
151
152
## Error Handling
153
154
The connector throws various exceptions that should be handled:
155
156
- **IllegalArgumentException**: Invalid configuration parameters
157
- **RuntimeException**: Connection or serialization errors
158
- **Exception**: General Kafka or ZooKeeper connectivity issues
159
160
Proper error handling should include retry logic for transient failures and graceful degradation for persistent issues.
161
162
## Configuration Properties
163
164
Key Kafka properties for consumer configuration:
165
- `bootstrap.servers`: Kafka broker addresses
166
- `zookeeper.connect`: ZooKeeper connection string
167
- `group.id`: Consumer group identifier
168
- `auto.offset.reset`: Offset reset strategy
169
170
Key Kafka properties for producer configuration:
171
- `bootstrap.servers`: Kafka broker addresses
172
- `key.serializer`: Key serialization class
173
- `value.serializer`: Value serialization class
174
175
## Types
176
177
```java { .api }
178
/**
179
* Interface for deserializing Kafka message values only
180
*/
181
public interface DeserializationSchema<T> {
182
T deserialize(byte[] message) throws IOException;
183
boolean isEndOfStream(T nextElement);
184
TypeInformation<T> getProducedType();
185
}
186
187
/**
188
* Interface for deserializing Kafka messages with key, value, topic, partition, and offset
189
*/
190
public interface KeyedDeserializationSchema<T> {
191
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
192
boolean isEndOfStream(T nextElement);
193
TypeInformation<T> getProducedType();
194
}
195
196
/**
197
* Interface for serializing objects to Kafka message values only
198
*/
199
public interface SerializationSchema<T> {
200
byte[] serialize(T element);
201
}
202
203
/**
204
* Interface for serializing objects to Kafka messages with keys and values
205
*/
206
public interface KeyedSerializationSchema<T> {
207
byte[] serializeKey(T element);
208
byte[] serializeValue(T element);
209
String getTargetTopic(T element);
210
}
211
212
/**
213
* Interface for custom partitioning logic
214
*/
215
public interface FlinkKafkaPartitioner<T> {
216
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
217
}
218
```