or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md

index.mddocs/

0

# Apache Flink Kafka Connector 0.9

1

2

Apache Flink Kafka Connector 0.9 provides streaming data integration between Apache Flink and Kafka 0.9.x message brokers. The connector enables real-time data processing pipelines with exactly-once processing guarantees, fault tolerance, and high-throughput capabilities for both consuming from and producing to Kafka topics.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-kafka-0.9_2.12

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-connector-kafka-0.9_2.12

11

- **Version**: 1.10.3

12

- **Installation**:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-connector-kafka-0.9_2.12</artifactId>

17

<version>1.10.3</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

25

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;

26

import org.apache.flink.api.common.serialization.SimpleStringSchema;

27

import java.util.Properties;

28

```

29

30

## Basic Usage

31

32

```java

33

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

34

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

35

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;

36

import org.apache.flink.api.common.serialization.SimpleStringSchema;

37

import java.util.Properties;

38

39

// Set up the streaming execution environment

40

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

41

42

// Configure Kafka properties

43

Properties kafkaProps = new Properties();

44

kafkaProps.setProperty("bootstrap.servers", "localhost:9092");

45

kafkaProps.setProperty("group.id", "my-consumer-group");

46

47

// Create Kafka consumer

48

FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(

49

"my-input-topic",

50

new SimpleStringSchema(),

51

kafkaProps

52

);

53

54

// Create Kafka producer

55

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(

56

"my-output-topic",

57

new SimpleStringSchema(),

58

kafkaProps

59

);

60

61

// Build streaming pipeline

62

env.addSource(consumer)

63

.map(value -> value.toUpperCase())

64

.addSink(producer);

65

66

// Execute the job

67

env.execute("Kafka Streaming Job");

68

```

69

70

## Architecture

71

72

Apache Flink Kafka Connector 0.9 is built around several key components:

73

74

- **Consumer API**: `FlinkKafkaConsumer09` for reading data from Kafka topics with configurable parallelism and offset management

75

- **Producer API**: `FlinkKafkaProducer09` for writing data to Kafka topics with partitioning and serialization control

76

- **Table Integration**: Table API factories for declarative SQL-based processing

77

- **Internal Engine**: Kafka 0.9-specific implementation handling partition discovery, consumer threading, and offset coordination

78

- **Fault Tolerance**: Integration with Flink's checkpointing mechanism for exactly-once processing guarantees

79

80

## Capabilities

81

82

### Data Consumption

83

84

Streaming data source functionality for consuming from Kafka 0.9.x topics with configurable deserialization, offset management, and fault tolerance.

85

86

```java { .api }

87

public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {

88

public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

89

public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

90

public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);

91

public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);

92

public FlinkConnectorRateLimiter getRateLimiter();

93

}

94

```

95

96

[Data Consumption](./data-consumption.md)

97

98

### Data Production

99

100

Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees.

101

102

```java { .api }

103

public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {

104

public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);

105

public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);

106

public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);

107

}

108

```

109

110

[Data Production](./data-production.md)

111

112

### Table API Integration

113

114

Table API and SQL integration for declarative stream processing with Kafka sources and sinks through factory-based configuration.

115

116

```java { .api }

117

public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {

118

protected String kafkaVersion();

119

protected boolean supportsKafkaTimestamps();

120

protected KafkaTableSourceBase createKafkaTableSource(...);

121

protected KafkaTableSinkBase createKafkaTableSink(...);

122

}

123

```

124

125

[Table API Integration](./table-api-integration.md)

126

127

## Common Types and Interfaces

128

129

```java { .api }

130

// Kafka consumer configuration key constants

131

public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

132

public static final long DEFAULT_POLL_TIMEOUT = 100L;

133

134

// Deserialization interfaces for data conversion

135

interface DeserializationSchema<T> {

136

T deserialize(byte[] message) throws IOException;

137

boolean isEndOfStream(T nextElement);

138

TypeInformation<T> getProducedType();

139

}

140

141

interface KafkaDeserializationSchema<T> {

142

T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

143

boolean isEndOfStream(T nextElement);

144

TypeInformation<T> getProducedType();

145

}

146

147

// Serialization interfaces for data conversion

148

interface SerializationSchema<T> {

149

byte[] serialize(T element);

150

}

151

152

interface KeyedSerializationSchema<T> {

153

byte[] serializeKey(T element);

154

byte[] serializeValue(T element);

155

String getTargetTopic(T element);

156

}

157

158

// Partitioning interface for custom distribution logic

159

interface FlinkKafkaPartitioner<T> extends Serializable {

160

int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

161

}

162

163

// Rate limiting interface for consumption throttling

164

interface FlinkConnectorRateLimiter {

165

void open(RuntimeContext runtimeContext) throws Exception;

166

void acquire(long permits);

167

void close() throws Exception;

168

}

169

170

// Kafka topic partition representation

171

class KafkaTopicPartition implements Comparable<KafkaTopicPartition>, Serializable {

172

public KafkaTopicPartition(String topic, int partition);

173

public String getTopic();

174

public int getPartition();

175

public String toString();

176

public boolean equals(Object o);

177

public int hashCode();

178

public int compareTo(KafkaTopicPartition other);

179

}

180

181

// Startup mode enumeration for consumers

182

enum StartupMode {

183

EARLIEST,

184

LATEST,

185

GROUP_OFFSETS,

186

SPECIFIC_OFFSETS,

187

TIMESTAMP

188

}

189

190

// Watermark assignment interfaces for time-based processing

191

interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

192

Watermark getCurrentWatermark();

193

}

194

195

interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

196

Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);

197

}

198

```