0
# Apache Flink Kafka Connector 0.9
1
2
Apache Flink Kafka Connector 0.9 provides streaming data integration between Apache Flink and Kafka 0.9.x message brokers. The connector enables real-time data processing pipelines with exactly-once processing guarantees, fault tolerance, and high-throughput capabilities for both consuming from and producing to Kafka topics.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-kafka-0.9_2.12
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-connector-kafka-0.9_2.12
11
- **Version**: 1.10.3
12
- **Installation**:
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-connector-kafka-0.9_2.12</artifactId>
17
<version>1.10.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
25
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
26
import org.apache.flink.api.common.serialization.SimpleStringSchema;
27
import java.util.Properties;
28
```
29
30
## Basic Usage
31
32
```java
33
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
35
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
36
import org.apache.flink.api.common.serialization.SimpleStringSchema;
37
import java.util.Properties;
38
39
// Set up the streaming execution environment
40
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
41
42
// Configure Kafka properties
43
Properties kafkaProps = new Properties();
44
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
45
kafkaProps.setProperty("group.id", "my-consumer-group");
46
47
// Create Kafka consumer
48
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
49
"my-input-topic",
50
new SimpleStringSchema(),
51
kafkaProps
52
);
53
54
// Create Kafka producer
55
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
56
"my-output-topic",
57
new SimpleStringSchema(),
58
kafkaProps
59
);
60
61
// Build streaming pipeline
62
env.addSource(consumer)
63
.map(value -> value.toUpperCase())
64
.addSink(producer);
65
66
// Execute the job
67
env.execute("Kafka Streaming Job");
68
```
69
70
## Architecture
71
72
Apache Flink Kafka Connector 0.9 is built around several key components:
73
74
- **Consumer API**: `FlinkKafkaConsumer09` for reading data from Kafka topics with configurable parallelism and offset management
75
- **Producer API**: `FlinkKafkaProducer09` for writing data to Kafka topics with partitioning and serialization control
76
- **Table Integration**: Table API factories for declarative SQL-based processing
77
- **Internal Engine**: Kafka 0.9-specific implementation handling partition discovery, consumer threading, and offset coordination
78
- **Fault Tolerance**: Integration with Flink's checkpointing mechanism for exactly-once processing guarantees
79
80
## Capabilities
81
82
### Data Consumption
83
84
Streaming data source functionality for consuming from Kafka 0.9.x topics with configurable deserialization, offset management, and fault tolerance.
85
86
```java { .api }
87
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
88
public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
89
public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
90
public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
91
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
92
public FlinkConnectorRateLimiter getRateLimiter();
93
}
94
```
95
96
[Data Consumption](./data-consumption.md)
97
98
### Data Production
99
100
Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees.
101
102
```java { .api }
103
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
104
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
105
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
106
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
107
}
108
```
109
110
[Data Production](./data-production.md)
111
112
### Table API Integration
113
114
Table API and SQL integration for declarative stream processing with Kafka sources and sinks through factory-based configuration.
115
116
```java { .api }
117
public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
118
protected String kafkaVersion();
119
protected boolean supportsKafkaTimestamps();
120
protected KafkaTableSourceBase createKafkaTableSource(...);
121
protected KafkaTableSinkBase createKafkaTableSink(...);
122
}
123
```
124
125
[Table API Integration](./table-api-integration.md)
126
127
## Common Types and Interfaces
128
129
```java { .api }
130
// Kafka consumer configuration key constants
131
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
132
public static final long DEFAULT_POLL_TIMEOUT = 100L;
133
134
// Deserialization interfaces for data conversion
135
interface DeserializationSchema<T> {
136
T deserialize(byte[] message) throws IOException;
137
boolean isEndOfStream(T nextElement);
138
TypeInformation<T> getProducedType();
139
}
140
141
interface KafkaDeserializationSchema<T> {
142
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
143
boolean isEndOfStream(T nextElement);
144
TypeInformation<T> getProducedType();
145
}
146
147
// Serialization interfaces for data conversion
148
interface SerializationSchema<T> {
149
byte[] serialize(T element);
150
}
151
152
interface KeyedSerializationSchema<T> {
153
byte[] serializeKey(T element);
154
byte[] serializeValue(T element);
155
String getTargetTopic(T element);
156
}
157
158
// Partitioning interface for custom distribution logic
159
interface FlinkKafkaPartitioner<T> extends Serializable {
160
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
161
}
162
163
// Rate limiting interface for consumption throttling
164
interface FlinkConnectorRateLimiter {
165
void open(RuntimeContext runtimeContext) throws Exception;
166
void acquire(long permits);
167
void close() throws Exception;
168
}
169
170
// Kafka topic partition representation
171
class KafkaTopicPartition implements Comparable<KafkaTopicPartition>, Serializable {
172
public KafkaTopicPartition(String topic, int partition);
173
public String getTopic();
174
public int getPartition();
175
public String toString();
176
public boolean equals(Object o);
177
public int hashCode();
178
public int compareTo(KafkaTopicPartition other);
179
}
180
181
// Startup mode enumeration for consumers
182
enum StartupMode {
183
EARLIEST,
184
LATEST,
185
GROUP_OFFSETS,
186
SPECIFIC_OFFSETS,
187
TIMESTAMP
188
}
189
190
// Watermark assignment interfaces for time-based processing
191
interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
192
Watermark getCurrentWatermark();
193
}
194
195
interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
196
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
197
}
198
```