or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md

index.mddocs/

0

# Apache Flink Kafka 0.11 SQL Connector

1

2

Apache Flink SQL connector for Apache Kafka 0.11.x that provides both streaming and Table/SQL API integration with comprehensive transaction support and exactly-once semantics. This shaded connector packages all Kafka client dependencies to prevent classpath conflicts in Flink deployments.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-connector-kafka-0.11_2.11

7

- **Package Type**: maven

8

- **Language**: Java/Scala

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-sql-connector-kafka-0.11_2.11

11

- **Version**: 1.11.6

12

- **Installation**:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-sql-connector-kafka-0.11_2.11</artifactId>

17

<version>1.11.6</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

For streaming DataStream programs:

24

25

```java

26

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

27

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

28

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;

29

```

30

31

For Table/SQL API integration:

32

33

```java

34

import org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory;

35

import org.apache.flink.streaming.connectors.kafka.Kafka011TableSource;

36

import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;

37

```

38

39

For serialization and partitioning:

40

41

```java

42

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

43

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;

44

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

45

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;

46

```

47

48

## Basic Usage

49

50

### Streaming Consumer

51

52

```java

53

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

54

import org.apache.flink.api.common.serialization.SimpleStringSchema;

55

56

Properties properties = new Properties();

57

properties.setProperty("bootstrap.servers", "localhost:9092");

58

properties.setProperty("group.id", "my-group");

59

60

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(

61

"my-topic",

62

new SimpleStringSchema(),

63

properties

64

);

65

66

DataStream<String> stream = env.addSource(consumer);

67

```

68

69

### Streaming Producer with Exactly-Once

70

71

```java

72

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

73

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;

74

75

Properties properties = new Properties();

76

properties.setProperty("bootstrap.servers", "localhost:9092");

77

properties.setProperty("transaction.timeout.ms", "900000");

78

79

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(

80

"output-topic",

81

new SimpleStringSchema(),

82

properties,

83

Semantic.EXACTLY_ONCE

84

);

85

86

stream.addSink(producer);

87

```

88

89

### SQL Table Definition

90

91

```sql

92

CREATE TABLE kafka_table (

93

id INT,

94

name STRING,

95

timestamp_col TIMESTAMP(3)

96

) WITH (

97

'connector' = 'kafka-0.11',

98

'topic' = 'my-topic',

99

'properties.bootstrap.servers' = 'localhost:9092',

100

'properties.group.id' = 'my-group',

101

'scan.startup.mode' = 'earliest-offset',

102

'format' = 'json'

103

);

104

```

105

106

## Architecture

107

108

The Flink Kafka 0.11 connector is built around several key components:

109

110

- **Streaming Consumer**: `FlinkKafkaConsumer011` for reading from Kafka topics with checkpoint integration

111

- **Streaming Producer**: `FlinkKafkaProducer011` supporting transactional writes and exactly-once semantics

112

- **Table API Integration**: Dynamic table factories for SQL DDL support

113

- **Shaded Dependencies**: All Kafka client libraries relocated to prevent conflicts

114

- **Transaction Support**: Two-phase commit protocol for exactly-once guarantees

115

- **Configuration System**: Extensive Properties-based configuration with Flink-specific extensions

116

117

## Capabilities

118

119

### Streaming Consumer API

120

121

Provides `FlinkKafkaConsumer011` for consuming from Kafka topics with support for multiple deserialization patterns, startup modes, and partition discovery.

122

123

```java { .api }

124

// Primary constructors for different use cases

125

FlinkKafkaConsumer011<T>(String topic, DeserializationSchema<T> valueDeserializer, Properties props)

126

FlinkKafkaConsumer011<T>(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)

127

FlinkKafkaConsumer011<T>(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)

128

```

129

130

[Streaming Consumer](./streaming-consumer.md)

131

132

### Streaming Producer API

133

134

Provides `FlinkKafkaProducer011` with transactional support, multiple delivery semantics, and flexible partitioning options.

135

136

```java { .api }

137

// Core producer constructors

138

FlinkKafkaProducer011<IN>(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic)

139

FlinkKafkaProducer011<IN>(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize)

140

141

// Delivery semantics

142

enum Semantic {

143

EXACTLY_ONCE,

144

AT_LEAST_ONCE,

145

NONE

146

}

147

```

148

149

[Streaming Producer](./streaming-producer.md)

150

151

### Table/SQL API Integration

152

153

Provides factory classes for creating Kafka table sources and sinks in Flink's Table API and SQL, supporting both legacy and modern dynamic table APIs.

154

155

```java { .api }

156

// Dynamic table factory for SQL DDL

157

class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {

158

String factoryIdentifier() // Returns "kafka-0.11"

159

}

160

161

// Dynamic table source and sink

162

class Kafka011DynamicSource extends KafkaDynamicSourceBase

163

class Kafka011DynamicSink extends KafkaDynamicSinkBase

164

```

165

166

[Table API](./table-api.md)

167

168

### Serialization and Deserialization

169

170

Base interfaces and implementations for converting between Flink data types and Kafka record formats, with access to Kafka metadata.

171

172

```java { .api }

173

// Core serialization interfaces

174

interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

175

T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

176

void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception;

177

}

178

179

interface KafkaSerializationSchema<T> extends Serializable {

180

ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

181

}

182

```

183

184

[Serialization](./serialization.md)

185

186

### Configuration and Partitioning

187

188

Configuration options, startup modes, and custom partitioning strategies for fine-tuning connector behavior.

189

190

```java { .api }

191

// Startup mode options

192

enum StartupMode {

193

GROUP_OFFSETS,

194

EARLIEST,

195

LATEST,

196

TIMESTAMP,

197

SPECIFIC_OFFSETS

198

}

199

200

// Custom partitioner base class

201

abstract class FlinkKafkaPartitioner<T> implements Serializable {

202

abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

203

}

204

```

205

206

[Configuration](./configuration.md)

207

208

## Types

209

210

### Core Exception Types

211

212

```java { .api }

213

class FlinkKafka011Exception extends FlinkException {

214

FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message);

215

FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause);

216

FlinkKafka011ErrorCode getErrorCode();

217

}

218

219

enum FlinkKafka011ErrorCode {

220

PRODUCERS_POOL_EMPTY,

221

EXTERNAL_ERROR

222

}

223

```