or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-11

Apache Flink connector for integrating with Apache Kafka 0.8.x message broker systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.8_2.11@1.10.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-11@1.10.0

0

# Apache Flink Kafka Connector 0.8

1

2

A comprehensive streaming connector for integrating Apache Flink with Apache Kafka 0.8.x, providing both source and sink capabilities with exactly-once processing guarantees for consumers and Table API integration.

3

4

## Package Information

5

6

**Maven Coordinates:**

7

```xml

8

<dependency>

9

<groupId>org.apache.flink</groupId>

10

<artifactId>flink-connector-kafka-0.8_2.11</artifactId>

11

<version>1.10.3</version>

12

</dependency>

13

```

14

15

*Note: The artifact ID includes the Scala binary version suffix (e.g., `_2.11` for Scala 2.11, `_2.12` for Scala 2.12)*

16

17

**Java Version:** Java 8+

18

19

**Kafka Compatibility:** Apache Kafka 0.8.x

20

21

**Main Package:** `org.apache.flink.streaming.connectors.kafka`

22

23

## Core Imports

24

25

```java { .api }

26

// Core Consumer Classes

27

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

28

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

29

30

// Core Producer Classes

31

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

32

33

// Serialization

34

import org.apache.flink.api.common.serialization.DeserializationSchema;

35

import org.apache.flink.api.common.serialization.SerializationSchema;

36

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

37

38

// Table API (Internal)

39

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;

40

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;

41

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;

42

43

// Required Dependencies

44

import java.util.Properties;

45

import java.util.List;

46

import java.util.Map;

47

import java.util.regex.Pattern;

48

```

49

50

## Key Features

51

52

### Consumer Features

53

- **Exactly-once processing guarantees** through Flink's checkpointing mechanism

54

- **Dynamic partition discovery** with pattern-based topic subscription

55

- **Multiple startup modes**: earliest, latest, group offsets, or specific offsets

56

- **Watermark support** for event time processing

57

- **Fault tolerance** with automatic offset recovery

58

- **ZooKeeper integration** for Kafka 0.8 metadata management

59

60

### Producer Features

61

- **Custom partitioning** strategies with FlinkKafkaPartitioner

62

- **Key/Value serialization** support

63

- **Checkpointing integration** with flush capabilities

64

- **Broker list or Properties configuration**

65

- **Note**: Kafka 0.8 producer provides no reliability guarantees

66

67

### Table API Integration

68

- **Schema evolution** support with field mapping

69

- **Processing time and rowtime attributes**

70

- **Configurable startup modes and offset management**

71

- **Factory-based configuration** for SQL/Table API

72

73

## Quick Start

74

75

### Basic Consumer Example

76

77

```java { .api }

78

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

79

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

80

import org.apache.flink.api.common.serialization.SimpleStringSchema;

81

82

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

83

84

Properties properties = new Properties();

85

properties.setProperty("zookeeper.connect", "localhost:2181");

86

properties.setProperty("group.id", "my-consumer-group");

87

88

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

89

"my-topic",

90

new SimpleStringSchema(),

91

properties

92

);

93

94

consumer.setStartFromEarliest();

95

env.addSource(consumer).print();

96

env.execute("Kafka Consumer Job");

97

```

98

99

### Basic Producer Example

100

101

```java { .api }

102

import org.apache.flink.streaming.api.datastream.DataStream;

103

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

104

import org.apache.flink.api.common.serialization.SimpleStringSchema;

105

106

Properties properties = new Properties();

107

properties.setProperty("metadata.broker.list", "localhost:9092");

108

109

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

110

"my-topic",

111

new SimpleStringSchema(),

112

properties

113

);

114

115

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");

116

stream.addSink(producer);

117

```

118

119

## Architecture

120

121

### Consumer Architecture

122

123

The Flink Kafka Consumer 0.8 is built on a multi-layered architecture:

124

125

1. **FlinkKafkaConsumer08** - Main consumer class extending FlinkKafkaConsumerBase

126

2. **AbstractFetcher** - Handles partition fetching and watermark generation

127

3. **AbstractPartitionDiscoverer** - Manages partition discovery and metadata

128

4. **KafkaDeserializationSchema** - Converts Kafka ConsumerRecords to Flink data types

129

130

```java { .api }

131

// Core consumer setup with custom deserialization

132

KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {

133

@Override

134

public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

135

// Custom deserialization logic

136

return MyEvent.fromBytes(record.value());

137

}

138

139

@Override

140

public boolean isEndOfStream(MyEvent nextElement) {

141

return false; // Never end stream

142

}

143

144

@Override

145

public TypeInformation<MyEvent> getProducedType() {

146

return TypeInformation.of(MyEvent.class);

147

}

148

};

149

```

150

151

### Producer Architecture

152

153

The producer architecture focuses on reliable message delivery within Kafka 0.8 constraints:

154

155

1. **FlinkKafkaProducer08** - Main producer extending FlinkKafkaProducerBase

156

2. **FlinkKafkaPartitioner** - Custom partitioning logic (optional)

157

3. **SerializationSchema/KeyedSerializationSchema** - Message serialization

158

159

```java { .api }

160

// Custom partitioner example

161

FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {

162

@Override

163

public int partition(MyEvent record, byte[] key, byte[] value,

164

String targetTopic, int[] partitions) {

165

// Custom partitioning logic based on record content

166

return Math.abs(record.getCustomerId().hashCode() % partitions.length);

167

}

168

};

169

```

170

171

## Configuration

172

173

### Required Kafka 0.8 Properties

174

175

```java { .api }

176

Properties kafkaProps = new Properties();

177

178

// Required for Consumer

179

kafkaProps.setProperty("zookeeper.connect", "localhost:2181");

180

kafkaProps.setProperty("group.id", "my-group");

181

182

// Required for Producer

183

kafkaProps.setProperty("metadata.broker.list", "localhost:9092");

184

185

// Optional Consumer Properties

186

kafkaProps.setProperty("auto.offset.reset", "earliest"); // or "latest"

187

kafkaProps.setProperty("fetch.message.max.bytes", "1048576");

188

kafkaProps.setProperty("socket.timeout.ms", "30000");

189

kafkaProps.setProperty("auto.commit.enable", "false"); // Recommended for exactly-once

190

191

// Flink-specific Properties

192

kafkaProps.setProperty("flink.partition-discovery.interval-millis", "30000");

193

kafkaProps.setProperty("flink.disable-metrics", "false");

194

```

195

196

## Startup Mode Configuration

197

198

```java { .api }

199

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

200

"my-topic", new SimpleStringSchema(), properties);

201

202

// Start from earliest available messages

203

consumer.setStartFromEarliest();

204

205

// Start from latest messages (skip existing)

206

consumer.setStartFromLatest();

207

208

// Start from consumer group's committed offsets (default)

209

consumer.setStartFromGroupOffsets();

210

211

// Start from specific offsets per partition

212

Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();

213

specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);

214

specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);

215

consumer.setStartFromSpecificOffsets(specificOffsets);

216

```

217

218

## Watermark and Timestamp Assignment

219

220

```java { .api }

221

import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;

222

import org.apache.flink.streaming.api.watermark.Watermark;

223

224

FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(

225

"my-topic", myDeserializer, properties);

226

227

// Assign periodic watermarks

228

consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyEvent>() {

229

private long currentMaxTimestamp = Long.MIN_VALUE;

230

231

@Override

232

public long extractTimestamp(MyEvent element, long previousElementTimestamp) {

233

long timestamp = element.getTimestamp();

234

currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

235

return timestamp;

236

}

237

238

@Override

239

public Watermark getCurrentWatermark() {

240

return new Watermark(currentMaxTimestamp - 5000); // 5 second tolerance

241

}

242

});

243

```

244

245

## Error Handling and Reliability

246

247

```java { .api }

248

// Consumer reliability configuration

249

consumer.setCommitOffsetsOnCheckpoints(true); // Enable exactly-once

250

251

// Producer error handling

252

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(

253

"my-topic", new SimpleStringSchema(), properties);

254

255

producer.setLogFailuresOnly(false); // Fail on errors (default)

256

producer.setFlushOnCheckpoint(true); // Flush data on checkpoint

257

```

258

259

## API Documentation

260

261

For detailed API documentation of specific components:

262

263

- **[Kafka Consumer API](kafka-consumer.md)** - Complete FlinkKafkaConsumer08 API reference

264

- **[Kafka Producer API](kafka-producer.md)** - Complete FlinkKafkaProducer08 API reference

265

- **[Table API Integration](table-api.md)** - Kafka08TableSource, Kafka08TableSink, and factory classes (*Note: These are @Internal APIs*)

266

267

## Version Notes

268

269

### Kafka 0.8 Limitations

270

- **No transactional support**: Producers cannot provide exactly-once guarantees

271

- **ZooKeeper dependency**: Consumers require ZooKeeper for metadata operations

272

- **No timestamp support**: Cannot fetch offsets by timestamp

273

- **Limited reliability**: Producers may lose messages on failures

274

275

### Deprecated Classes

276

- `FlinkKafkaConsumer081` - Use `FlinkKafkaConsumer08` instead

277

- `FlinkKafkaConsumer082` - Use `FlinkKafkaConsumer08` instead

278

- `FlinkKafkaProducer` - Use `FlinkKafkaProducer08` instead

279

280

### Internal APIs

281

- **Table API classes** (Kafka08TableSource, Kafka08TableSink, Kafka08TableSourceSinkFactory) are marked `@Internal`

282

- These may change without notice and are not part of the official public API

283

- Use through Flink's Table API framework rather than directly

284

285

### Migration Path

286

When upgrading from Kafka 0.8 to newer versions, consider:

287

1. Replacing with `flink-connector-kafka-0.9+` for better reliability guarantees

288

2. Updating ZooKeeper-based configuration to bootstrap servers

289

3. Migrating to transactional producers for exactly-once semantics