0
# Flink Kafka Connector Base
1
2
A foundational library providing base classes and common functionality for Apache Flink's Kafka connectors. This library enables building version-specific Kafka connectors (0.8, 0.9, 0.10, etc.) while sharing core streaming connector functionality including state management, offset tracking, checkpointing, and fault tolerance for exactly-once processing guarantees.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-kafka-base_2.11
7
- **Package Type**: maven
8
- **Language**: Java/Scala
9
- **Installation**: Add to your Maven `pom.xml`:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-connector-kafka-base_2.11</artifactId>
14
<version>1.5.1</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
22
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
23
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
24
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
25
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
26
```
27
28
## Basic Usage
29
30
```java
31
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
32
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
33
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
34
import java.util.Properties;
35
36
// Environment setup
37
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
38
39
// Kafka properties
40
Properties properties = new Properties();
41
properties.setProperty("bootstrap.servers", "localhost:9092");
42
properties.setProperty("group.id", "test");
43
44
// Create consumer (abstract - extend for specific Kafka version)
45
// This shows the pattern for extending the base consumer
46
public class MyKafkaConsumer extends FlinkKafkaConsumerBase<String> {
47
public MyKafkaConsumer(String topic, DeserializationSchema<String> schema, Properties props) {
48
super(Arrays.asList(topic), null, new KeyedDeserializationSchemaWrapper<>(schema),
49
PARTITION_DISCOVERY_DISABLED, false);
50
}
51
52
// Implement abstract methods for specific Kafka version
53
// ...
54
}
55
56
// Use the consumer
57
DataStream<String> stream = env.addSource(
58
new MyKafkaConsumer("my-topic", new SimpleStringSchema(), properties)
59
.setStartFromEarliest()
60
.setCommitOffsetsOnCheckpoints(true)
61
);
62
```
63
64
## Architecture
65
66
The Flink Kafka Connector Base library is organized around several key architectural patterns:
67
68
- **Abstract Base Classes**: `FlinkKafkaConsumerBase` and `FlinkKafkaProducerBase` provide version-agnostic functionality that concrete implementations extend for specific Kafka versions
69
- **Serialization Abstraction**: `KeyedDeserializationSchema` and `KeyedSerializationSchema` interfaces handle message serialization with key-value semantics and metadata access
70
- **Partition Management**: Internal classes manage partition discovery, offset tracking, and state management for fault tolerance
71
- **Table API Integration**: Table sources and sinks provide SQL layer integration with automatic schema inference and connector factory support
72
- **Exactly-Once Semantics**: Built-in support for checkpointing, offset commits, and transaction coordination for guaranteed delivery
73
74
## Capabilities
75
76
### Consumer Base Classes
77
78
Abstract base implementations for Kafka consumers providing common functionality across all Kafka versions including offset management, checkpointing, and watermark assignment.
79
80
```java { .api }
81
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
82
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
83
84
public FlinkKafkaConsumerBase<T> setStartFromEarliest();
85
public FlinkKafkaConsumerBase<T> setStartFromLatest();
86
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();
87
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
88
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
89
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
90
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
91
}
92
```
93
94
[Consumer Base Classes](./consumer-base.md)
95
96
### Producer Base Classes
97
98
Abstract base implementations for Kafka producers providing common functionality including serialization, partitioning, and exactly-once delivery semantics.
99
100
```java { .api }
101
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
102
implements CheckpointedFunction {
103
104
public void setLogFailuresOnly(boolean logFailuresOnly);
105
public void setFlushOnCheckpoint(boolean flush);
106
public static Properties getPropertiesFromBrokerList(String brokerList);
107
}
108
```
109
110
[Producer Base Classes](./producer-base.md)
111
112
### Serialization Schemas
113
114
Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety.
115
116
```java { .api }
117
public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
118
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
119
boolean isEndOfStream(T nextElement);
120
}
121
122
public interface KeyedSerializationSchema<T> extends Serializable {
123
byte[] serializeKey(T element);
124
byte[] serializeValue(T element);
125
String getTargetTopic(T element);
126
}
127
```
128
129
[Serialization Schemas](./serialization.md)
130
131
### Partitioners
132
133
Custom partitioning logic for determining target Kafka partitions when producing messages, including fixed partitioning and delegation to Kafka's default partitioner.
134
135
```java { .api }
136
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
137
public void open(int parallelInstanceId, int parallelInstances);
138
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
139
}
140
```
141
142
[Partitioners](./partitioners.md)
143
144
### Table API Integration
145
146
Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference and connector descriptors.
147
148
```java { .api }
149
public abstract class KafkaTableSource implements StreamTableSource<Row>,
150
DefinedProctimeAttribute, DefinedRowtimeAttributes, FilterableTableSource<Row> {
151
// Abstract methods implemented by concrete table sources
152
}
153
154
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
155
// Abstract methods implemented by concrete table sinks
156
}
157
```
158
159
[Table API Integration](./table-api.md)
160
161
## Types
162
163
### Core Types
164
165
```java { .api }
166
public final class KafkaTopicPartition implements Serializable {
167
public KafkaTopicPartition(String topic, int partition);
168
public String getTopic();
169
public int getPartition();
170
public boolean equals(Object obj);
171
public int hashCode();
172
public String toString();
173
}
174
```
175
176
### Configuration Constants
177
178
```java { .api }
179
public class FlinkKafkaConsumerBase<T> {
180
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
181
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
182
}
183
```