or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md

index.mddocs/

0

# Apache Flink Kafka 0.8 Connector

1

2

The Apache Flink Kafka 0.8 connector enables high-performance streaming integration between Apache Flink and Apache Kafka 0.8.x clusters. It provides exactly-once processing guarantees through checkpointing, supports parallel consumption and production, and offers comprehensive offset management with ZooKeeper integration.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-kafka-0.8_2.10

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-connector-kafka-0.8_2.10

11

- **Installation**: Include as Maven dependency

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-connector-kafka-0.8_2.10</artifactId>

17

<version>1.3.3</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

25

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

26

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

27

import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

28

import org.apache.flink.streaming.util.serialization.SerializationSchema;

29

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

30

```

31

32

## Basic Usage

33

34

```java

35

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

36

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

37

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

38

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

39

40

import java.util.Properties;

41

42

// Set up Flink environment

43

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

44

45

// Configure Kafka consumer properties

46

Properties consumerProps = new Properties();

47

consumerProps.setProperty("bootstrap.servers", "localhost:9092");

48

consumerProps.setProperty("zookeeper.connect", "localhost:2181");

49

consumerProps.setProperty("group.id", "flink-consumer");

50

51

// Create consumer

52

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

53

"input-topic",

54

new SimpleStringSchema(),

55

consumerProps

56

);

57

58

// Add consumer as source

59

DataStream<String> stream = env.addSource(consumer);

60

61

// Configure producer properties

62

Properties producerProps = new Properties();

63

producerProps.setProperty("bootstrap.servers", "localhost:9092");

64

65

// Create producer and add as sink

66

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

67

"output-topic",

68

new SimpleStringSchema(),

69

producerProps

70

);

71

72

stream.addSink(producer);

73

74

// Execute

75

env.execute("Kafka Streaming Job");

76

```

77

78

## Architecture

79

80

The Flink Kafka 0.8 connector is built around several key components:

81

82

- **Consumer Architecture**: `FlinkKafkaConsumer08` extends `FlinkKafkaConsumerBase` and uses `Kafka08Fetcher` for message retrieval with ZooKeeper-based offset management

83

- **Producer Architecture**: `FlinkKafkaProducer08` extends `FlinkKafkaProducerBase` for message publishing with configurable partitioning

84

- **Checkpointing Integration**: Seamless integration with Flink's distributed snapshots for exactly-once processing guarantees

85

- **Table API Integration**: Table sources and sinks for SQL API usage with JSON and Avro format support

86

- **Offset Management**: ZooKeeper-based offset storage with periodic commits and recovery support

87

88

## Capabilities

89

90

### Kafka Consumer

91

92

Primary consumer for reading from Kafka 0.8.x topics with exactly-once processing guarantees and checkpointing support.

93

94

```java { .api }

95

public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {

96

public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

97

public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);

98

public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

99

public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);

100

}

101

```

102

103

[Kafka Consumer](./kafka-consumer.md)

104

105

### Kafka Producer

106

107

Producer for writing to Kafka 0.8.x topics with configurable partitioning and serialization support.

108

109

```java { .api }

110

public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {

111

public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);

112

public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);

113

public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);

114

}

115

```

116

117

[Kafka Producer](./kafka-producer.md)

118

119

### Table API Integration

120

121

Table sources and sinks for integrating Kafka with Flink's SQL API, supporting JSON and Avro formats.

122

123

```java { .api }

124

public class Kafka08TableSource extends KafkaTableSource {

125

public Kafka08TableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo);

126

}

127

128

public class Kafka08JsonTableSource extends KafkaJsonTableSource {

129

public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);

130

}

131

132

public class Kafka08JsonTableSink extends KafkaJsonTableSink {

133

public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);

134

}

135

```

136

137

[Table API Integration](./table-api.md)

138

139

### Offset Management

140

141

ZooKeeper-based offset management utilities for handling consumer offset storage and retrieval.

142

143

```java { .api }

144

public class ZookeeperOffsetHandler {

145

public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset);

146

public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition);

147

}

148

```

149

150

[Offset Management](./offset-management.md)

151

152

## Error Handling

153

154

The connector throws various exceptions that should be handled:

155

156

- **IllegalArgumentException**: Invalid configuration parameters

157

- **RuntimeException**: Connection or serialization errors

158

- **Exception**: General Kafka or ZooKeeper connectivity issues

159

160

Proper error handling should include retry logic for transient failures and graceful degradation for persistent issues.

161

162

## Configuration Properties

163

164

Key Kafka properties for consumer configuration:

165

- `bootstrap.servers`: Kafka broker addresses

166

- `zookeeper.connect`: ZooKeeper connection string

167

- `group.id`: Consumer group identifier

168

- `auto.offset.reset`: Offset reset strategy

169

170

Key Kafka properties for producer configuration:

171

- `bootstrap.servers`: Kafka broker addresses

172

- `key.serializer`: Key serialization class

173

- `value.serializer`: Value serialization class

174

175

## Types

176

177

```java { .api }

178

/**

179

* Interface for deserializing Kafka message values only

180

*/

181

public interface DeserializationSchema<T> {

182

T deserialize(byte[] message) throws IOException;

183

boolean isEndOfStream(T nextElement);

184

TypeInformation<T> getProducedType();

185

}

186

187

/**

188

* Interface for deserializing Kafka messages with key, value, topic, partition, and offset

189

*/

190

public interface KeyedDeserializationSchema<T> {

191

T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;

192

boolean isEndOfStream(T nextElement);

193

TypeInformation<T> getProducedType();

194

}

195

196

/**

197

* Interface for serializing objects to Kafka message values only

198

*/

199

public interface SerializationSchema<T> {

200

byte[] serialize(T element);

201

}

202

203

/**

204

* Interface for serializing objects to Kafka messages with keys and values

205

*/

206

public interface KeyedSerializationSchema<T> {

207

byte[] serializeKey(T element);

208

byte[] serializeValue(T element);

209

String getTargetTopic(T element);

210

}

211

212

/**

213

* Interface for custom partitioning logic

214

*/

215

public interface FlinkKafkaPartitioner<T> {

216

int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

217

}

218

```