or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-kafka-base_2-11

Base classes and utilities for Apache Flink Kafka connectors providing common functionality for stream processing with exactly-once guarantees

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-base_2.11@1.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-base_2-11@1.5.0

0

# Flink Kafka Connector Base

1

2

A foundational library providing base classes and common functionality for Apache Flink's Kafka connectors. This library enables building version-specific Kafka connectors (0.8, 0.9, 0.10, etc.) while sharing core streaming connector functionality including state management, offset tracking, checkpointing, and fault tolerance for exactly-once processing guarantees.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-kafka-base_2.11

7

- **Package Type**: maven

8

- **Language**: Java/Scala

9

- **Installation**: Add to your Maven `pom.xml`:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-connector-kafka-base_2.11</artifactId>

14

<version>1.5.1</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;

22

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;

23

import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

24

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

25

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

26

```

27

28

## Basic Usage

29

30

```java

31

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

32

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

33

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;

34

import java.util.Properties;

35

36

// Environment setup

37

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

38

39

// Kafka properties

40

Properties properties = new Properties();

41

properties.setProperty("bootstrap.servers", "localhost:9092");

42

properties.setProperty("group.id", "test");

43

44

// Create consumer (abstract - extend for specific Kafka version)

45

// This shows the pattern for extending the base consumer

46

public class MyKafkaConsumer extends FlinkKafkaConsumerBase<String> {

47

public MyKafkaConsumer(String topic, DeserializationSchema<String> schema, Properties props) {

48

super(Arrays.asList(topic), null, new KeyedDeserializationSchemaWrapper<>(schema),

49

PARTITION_DISCOVERY_DISABLED, false);

50

}

51

52

// Implement abstract methods for specific Kafka version

53

// ...

54

}

55

56

// Use the consumer

57

DataStream<String> stream = env.addSource(

58

new MyKafkaConsumer("my-topic", new SimpleStringSchema(), properties)

59

.setStartFromEarliest()

60

.setCommitOffsetsOnCheckpoints(true)

61

);

62

```

63

64

## Architecture

65

66

The Flink Kafka Connector Base library is organized around several key architectural patterns:

67

68

- **Abstract Base Classes**: `FlinkKafkaConsumerBase` and `FlinkKafkaProducerBase` provide version-agnostic functionality that concrete implementations extend for specific Kafka versions

69

- **Serialization Abstraction**: `KeyedDeserializationSchema` and `KeyedSerializationSchema` interfaces handle message serialization with key-value semantics and metadata access

70

- **Partition Management**: Internal classes manage partition discovery, offset tracking, and state management for fault tolerance

71

- **Table API Integration**: Table sources and sinks provide SQL layer integration with automatic schema inference and connector factory support

72

- **Exactly-Once Semantics**: Built-in support for checkpointing, offset commits, and transaction coordination for guaranteed delivery

73

74

## Capabilities

75

76

### Consumer Base Classes

77

78

Abstract base implementations for Kafka consumers providing common functionality across all Kafka versions including offset management, checkpointing, and watermark assignment.

79

80

```java { .api }

81

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>

82

implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {

83

84

public FlinkKafkaConsumerBase<T> setStartFromEarliest();

85

public FlinkKafkaConsumerBase<T> setStartFromLatest();

86

public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();

87

public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);

88

public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);

89

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);

90

public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);

91

}

92

```

93

94

[Consumer Base Classes](./consumer-base.md)

95

96

### Producer Base Classes

97

98

Abstract base implementations for Kafka producers providing common functionality including serialization, partitioning, and exactly-once delivery semantics.

99

100

```java { .api }

101

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>

102

implements CheckpointedFunction {

103

104

public void setLogFailuresOnly(boolean logFailuresOnly);

105

public void setFlushOnCheckpoint(boolean flush);

106

public static Properties getPropertiesFromBrokerList(String brokerList);

107

}

108

```

109

110

[Producer Base Classes](./producer-base.md)

111

112

### Serialization Schemas

113

114

Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety.

115

116

```java { .api }

117

public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

118

T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;

119

boolean isEndOfStream(T nextElement);

120

}

121

122

public interface KeyedSerializationSchema<T> extends Serializable {

123

byte[] serializeKey(T element);

124

byte[] serializeValue(T element);

125

String getTargetTopic(T element);

126

}

127

```

128

129

[Serialization Schemas](./serialization.md)

130

131

### Partitioners

132

133

Custom partitioning logic for determining target Kafka partitions when producing messages, including fixed partitioning and delegation to Kafka's default partitioner.

134

135

```java { .api }

136

public abstract class FlinkKafkaPartitioner<T> implements Serializable {

137

public void open(int parallelInstanceId, int parallelInstances);

138

public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

139

}

140

```

141

142

[Partitioners](./partitioners.md)

143

144

### Table API Integration

145

146

Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference and connector descriptors.

147

148

```java { .api }

149

public abstract class KafkaTableSource implements StreamTableSource<Row>,

150

DefinedProctimeAttribute, DefinedRowtimeAttributes, FilterableTableSource<Row> {

151

// Abstract methods implemented by concrete table sources

152

}

153

154

public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {

155

// Abstract methods implemented by concrete table sinks

156

}

157

```

158

159

[Table API Integration](./table-api.md)

160

161

## Types

162

163

### Core Types

164

165

```java { .api }

166

public final class KafkaTopicPartition implements Serializable {

167

public KafkaTopicPartition(String topic, int partition);

168

public String getTopic();

169

public int getPartition();

170

public boolean equals(Object obj);

171

public int hashCode();

172

public String toString();

173

}

174

```

175

176

### Configuration Constants

177

178

```java { .api }

179

public class FlinkKafkaConsumerBase<T> {

180

public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

181

public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;

182

}

183

```