or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer.mdindex.mdproducer.mdtable-api.md

index.mddocs/

0

# Apache Flink Kafka 0.10 Connector

1

2

Apache Flink Kafka 0.10 connector provides streaming data integration between Apache Flink and Apache Kafka 0.10.x message brokers. It enables both consuming from and producing to Kafka topics with exactly-once processing guarantees, dynamic partition discovery, and comprehensive error handling.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-kafka-0.10_2.12

7

- **Package Type**: maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-connector-kafka-0.10_2.12

10

- **Language**: Java

11

- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.11.6</version></dependency>`

12

13

## Core Imports

14

15

```java

16

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

17

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;

18

import org.apache.flink.api.common.serialization.DeserializationSchema;

19

import org.apache.flink.api.common.serialization.SerializationSchema;

20

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

21

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

22

```

23

24

## Basic Usage

25

26

### Consumer Example

27

28

```java

29

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

30

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

31

import org.apache.flink.api.common.serialization.SimpleStringSchema;

32

33

import java.util.Properties;

34

35

// Create Kafka consumer properties

36

Properties properties = new Properties();

37

properties.setProperty("bootstrap.servers", "localhost:9092");

38

properties.setProperty("group.id", "test-consumer-group");

39

40

// Create consumer

41

FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(

42

"my-topic",

43

new SimpleStringSchema(),

44

properties

45

);

46

47

// Add to Flink streaming environment

48

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

49

env.addSource(consumer)

50

.print();

51

```

52

53

### Producer Example

54

55

```java

56

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;

57

import org.apache.flink.api.common.serialization.SimpleStringSchema;

58

59

import java.util.Properties;

60

61

// Create Kafka producer properties

62

Properties properties = new Properties();

63

properties.setProperty("bootstrap.servers", "localhost:9092");

64

65

// Create producer

66

FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(

67

"output-topic",

68

new SimpleStringSchema(),

69

properties

70

);

71

72

// Add to streaming pipeline

73

dataStream.addSink(producer);

74

```

75

76

## Architecture

77

78

The Flink Kafka 0.10 connector is built around several key components:

79

80

- **Consumer Classes**: `FlinkKafkaConsumer010` for reading data from Kafka topics

81

- **Producer Classes**: `FlinkKafkaProducer010` for writing data to Kafka topics

82

- **Table API Integration**: Legacy and dynamic table factory support for SQL/Table API

83

- **Internal Components**: Fetcher, partition discoverer, and consumer thread management

84

- **Serialization Support**: Both simple value serialization and key-value serialization schemas

85

86

## Capabilities

87

88

### Data Stream Consumer

89

90

Core consumer functionality for reading data from Kafka 0.10.x topics with exactly-once processing guarantees and flexible topic subscription patterns.

91

92

```java { .api }

93

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {

94

// Single topic constructors

95

public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

96

public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);

97

98

// Multiple topics constructors

99

public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

100

public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);

101

102

// Pattern-based subscription constructors

103

public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);

104

public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);

105

106

// Rate limiting methods

107

public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);

108

public FlinkConnectorRateLimiter getRateLimiter();

109

}

110

```

111

112

[Data Stream Consumer](./consumer.md)

113

114

### Data Stream Producer

115

116

Core producer functionality for writing data to Kafka 0.10.x topics with exactly-once processing guarantees and custom partitioning support.

117

118

```java { .api }

119

public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {

120

// Value-only serialization constructors

121

public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);

122

public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);

123

public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);

124

125

// Key-value serialization constructors

126

public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);

127

public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);

128

public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);

129

130

// Configuration methods

131

public void setWriteTimestampToKafka(boolean writeTimestampToKafka);

132

}

133

```

134

135

[Data Stream Producer](./producer.md)

136

137

### Table API Integration

138

139

SQL and Table API integration for declarative stream processing with Kafka sources and sinks, supporting both legacy and dynamic table factories.

140

141

```java { .api }

142

// Legacy table factory

143

public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {

144

protected String kafkaVersion();

145

protected boolean supportsKafkaTimestamps();

146

}

147

148

// Dynamic table factory

149

public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {

150

public static final String IDENTIFIER = "kafka-0.10";

151

}

152

```

153

154

[Table API Integration](./table-api.md)

155

156

## Configuration Properties

157

158

### Consumer Configuration Constants

159

160

```java { .api }

161

public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

162

public static final long DEFAULT_POLL_TIMEOUT = 100L;

163

```

164

165

### Common Configuration Properties

166

167

- **bootstrap.servers**: Kafka broker addresses (required)

168

- **group.id**: Consumer group identifier

169

- **flink.poll-timeout**: Consumer polling timeout in milliseconds (default: 100)

170

- **enable.auto.commit**: Automatic offset commit (managed by Flink)

171

- **auto.offset.reset**: Initial offset behavior when no committed offset exists

172

173

## Types

174

175

### Core Consumer Type

176

177

```java { .api }

178

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {

179

// Generic type T represents the output data type after deserialization

180

}

181

```

182

183

### Core Producer Type

184

185

```java { .api }

186

public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {

187

// Generic type T represents the input data type before serialization

188

}

189

```

190

191

### Serialization Interfaces

192

193

```java { .api }

194

// Simple value deserialization

195

public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

196

T deserialize(byte[] message) throws IOException;

197

boolean isEndOfStream(T nextElement);

198

}

199

200

// Key-value deserialization

201

public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

202

T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

203

boolean isEndOfStream(T nextElement);

204

}

205

206

// Simple value serialization

207

public interface SerializationSchema<T> extends Serializable {

208

byte[] serialize(T element);

209

}

210

211

// Key-value serialization

212

public interface KeyedSerializationSchema<T> extends Serializable {

213

byte[] serializeKey(T element);

214

byte[] serializeValue(T element);

215

String getTargetTopic(T element);

216

}

217

```

218

219

### Partitioning Interface

220

221

```java { .api }

222

public abstract class FlinkKafkaPartitioner<T> implements Serializable {

223

public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

224

public void open(int parallelInstanceId, int parallelInstances);

225

}

226

```

227

228

### Rate Limiting Interface

229

230

```java { .api }

231

public interface FlinkConnectorRateLimiter extends Serializable {

232

void open(RuntimeContext runtimeContext) throws Exception;

233

void acquire(long bytes);

234

void close() throws Exception;

235

}

236

```