or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md

data-production.mddocs/

0

# Data Production

1

2

Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees for high-throughput data pipelines.

3

4

## Capabilities

5

6

### FlinkKafkaProducer09 Class

7

8

Main Kafka producer class for writing data from Flink data streams to Kafka 0.9.x topics.

9

10

```java { .api }

11

/**

12

* Kafka producer for writing data to Apache Kafka 0.9.x topics.

13

* Compatible with Kafka 0.9 without reliability guarantees.

14

*

15

* @param <IN> The type of records to write to Kafka

16

*/

17

@PublicEvolving

18

public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {

19

20

/**

21

* Creates a producer with broker list and key-less serialization using fixed partitioner.

22

*

23

* @param brokerList Comma separated addresses of Kafka brokers

24

* @param topicId ID of the Kafka topic to write to

25

* @param serializationSchema User defined key-less serialization schema

26

*/

27

public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);

28

29

/**

30

* Creates a producer with properties and key-less serialization using fixed partitioner.

31

*

32

* @param topicId ID of the Kafka topic to write to

33

* @param serializationSchema User defined key-less serialization schema

34

* @param producerConfig Properties with the producer configuration

35

*/

36

public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);

37

38

/**

39

* Creates a producer with key-less serialization and custom partitioner.

40

*

41

* @param topicId The topic to write data to

42

* @param serializationSchema A key-less serializable serialization schema

43

* @param producerConfig Configuration properties for the KafkaProducer

44

* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for round-robin

45

*/

46

public FlinkKafkaProducer09(

47

String topicId,

48

SerializationSchema<IN> serializationSchema,

49

Properties producerConfig,

50

FlinkKafkaPartitioner<IN> customPartitioner);

51

52

/**

53

* Creates a producer with broker list and key/value serialization using fixed partitioner.

54

*

55

* @param brokerList Comma separated addresses of Kafka brokers

56

* @param topicId ID of the Kafka topic to write to

57

* @param serializationSchema User defined serialization schema supporting key/value messages

58

*/

59

public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);

60

61

/**

62

* Creates a producer with properties and key/value serialization using fixed partitioner.

63

*

64

* @param topicId ID of the Kafka topic to write to

65

* @param serializationSchema User defined serialization schema supporting key/value messages

66

* @param producerConfig Properties with the producer configuration

67

*/

68

public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);

69

70

/**

71

* Creates a producer with key/value serialization and custom partitioner.

72

*

73

* @param topicId The topic to write data to

74

* @param serializationSchema A serializable serialization schema for key/value messages

75

* @param producerConfig Configuration properties for the KafkaProducer

76

* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for key-based partitioning

77

*/

78

public FlinkKafkaProducer09(

79

String topicId,

80

KeyedSerializationSchema<IN> serializationSchema,

81

Properties producerConfig,

82

FlinkKafkaPartitioner<IN> customPartitioner);

83

84

/**

85

* Sets whether the producer should only log failures instead of throwing exceptions.

86

*

87

* @param logFailuresOnly True to only log failures instead of throwing exceptions

88

*/

89

public void setLogFailuresOnly(boolean logFailuresOnly);

90

91

/**

92

* Sets whether the producer should flush pending records on checkpoint.

93

*

94

* @param flush True to flush on checkpoint, false otherwise

95

*/

96

public void setFlushOnCheckpoint(boolean flush);

97

}

98

```

99

100

### Deprecated Constructors

101

102

```java { .api }

103

/**

104

* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.

105

* Use FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner) instead.

106

*/

107

@Deprecated

108

public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);

109

110

/**

111

* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.

112

* Use FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) instead.

113

*/

114

@Deprecated

115

public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);

116

```

117

118

## Usage Examples

119

120

### Basic String Production

121

122

```java

123

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

124

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;

125

import org.apache.flink.api.common.serialization.SimpleStringSchema;

126

import java.util.Properties;

127

128

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

129

130

Properties properties = new Properties();

131

properties.setProperty("bootstrap.servers", "localhost:9092");

132

133

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(

134

"output-topic",

135

new SimpleStringSchema(),

136

properties

137

);

138

139

env.fromElements("Hello", "World", "Kafka")

140

.addSink(producer);

141

142

env.execute("Basic Kafka Producer");

143

```

144

145

### Custom Object Serialization

146

147

```java

148

import org.apache.flink.api.common.serialization.SerializationSchema;

149

import com.fasterxml.jackson.databind.ObjectMapper;

150

151

// Custom POJO

152

public class User {

153

public String name;

154

public int age;

155

public String email;

156

157

// constructors, getters, setters...

158

}

159

160

// JSON serialization schema

161

SerializationSchema<User> jsonSchema = new SerializationSchema<User>() {

162

private final ObjectMapper mapper = new ObjectMapper();

163

164

@Override

165

public byte[] serialize(User user) {

166

try {

167

return mapper.writeValueAsBytes(user);

168

} catch (Exception e) {

169

throw new RuntimeException("Failed to serialize user", e);

170

}

171

}

172

};

173

174

FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(

175

"user-events",

176

jsonSchema,

177

properties

178

);

179

180

env.fromElements(

181

new User("Alice", 25, "alice@example.com"),

182

new User("Bob", 30, "bob@example.com")

183

).addSink(producer);

184

```

185

186

### Key/Value Production

187

188

```java

189

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

190

191

// Custom keyed serialization for key/value messages

192

KeyedSerializationSchema<User> keyedSchema = new KeyedSerializationSchema<User>() {

193

private final ObjectMapper mapper = new ObjectMapper();

194

195

@Override

196

public byte[] serializeKey(User user) {

197

return user.email.getBytes(); // Use email as key

198

}

199

200

@Override

201

public byte[] serializeValue(User user) {

202

try {

203

return mapper.writeValueAsBytes(user);

204

} catch (Exception e) {

205

throw new RuntimeException("Failed to serialize user", e);

206

}

207

}

208

209

@Override

210

public String getTargetTopic(User user) {

211

return null; // Use default topic

212

}

213

};

214

215

FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(

216

"keyed-users",

217

keyedSchema,

218

properties

219

);

220

```

221

222

### Custom Partitioning

223

224

```java

225

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

226

227

// Custom partitioner based on user age

228

FlinkKafkaPartitioner<User> agePartitioner = new FlinkKafkaPartitioner<User>() {

229

@Override

230

public int partition(User record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

231

// Partition by age groups: 0-29, 30-49, 50+

232

int ageGroup = record.age < 30 ? 0 : (record.age < 50 ? 1 : 2);

233

return partitions[ageGroup % partitions.length];

234

}

235

};

236

237

FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(

238

"age-partitioned-users",

239

jsonSchema,

240

properties,

241

agePartitioner

242

);

243

```

244

245

### Advanced Producer Configuration

246

247

```java

248

Properties properties = new Properties();

249

// Required settings

250

properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");

251

252

// Performance tuning

253

properties.setProperty("batch.size", "16384");

254

properties.setProperty("linger.ms", "5");

255

properties.setProperty("compression.type", "snappy");

256

257

// Reliability settings (limited in Kafka 0.9)

258

properties.setProperty("acks", "1"); // 0=no ack, 1=leader ack, all=all replicas

259

properties.setProperty("retries", "3");

260

properties.setProperty("max.in.flight.requests.per.connection", "5");

261

262

// Buffer management

263

properties.setProperty("buffer.memory", "33554432");

264

properties.setProperty("max.block.ms", "60000");

265

266

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(

267

"configured-topic",

268

new SimpleStringSchema(),

269

properties

270

);

271

```

272

273

### Dynamic Topic Selection

274

275

```java

276

// Schema that can route to different topics based on record content

277

KeyedSerializationSchema<String> dynamicSchema = new KeyedSerializationSchema<String>() {

278

@Override

279

public byte[] serializeKey(String record) {

280

return null; // No key

281

}

282

283

@Override

284

public byte[] serializeValue(String record) {

285

return record.getBytes();

286

}

287

288

@Override

289

public String getTargetTopic(String record) {

290

// Route based on record content

291

if (record.startsWith("ERROR")) {

292

return "error-logs";

293

} else if (record.startsWith("WARN")) {

294

return "warning-logs";

295

} else {

296

return "info-logs";

297

}

298

}

299

};

300

301

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(

302

"default-topic", // Fallback topic

303

dynamicSchema,

304

properties

305

);

306

```

307

308

### Producer with Data Stream Transformations

309

310

```java

311

import org.apache.flink.streaming.api.datastream.DataStream;

312

313

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

314

315

// Create input stream

316

DataStream<String> inputStream = env.socketTextStream("localhost", 9999);

317

318

// Transform and produce to Kafka

319

inputStream

320

.filter(line -> !line.isEmpty())

321

.map(line -> line.toUpperCase())

322

.map(line -> "Processed at " + System.currentTimeMillis() + ": " + line)

323

.addSink(new FlinkKafkaProducer09<>(

324

"processed-data",

325

new SimpleStringSchema(),

326

properties

327

));

328

329

env.execute("Stream Processing to Kafka");

330

```

331

332

## Partitioning Strategies

333

334

### Default Fixed Partitioner

335

336

- Maps each sink subtask to a single Kafka partition

337

- All records from one subtask go to the same partition

338

- Provides good parallelism but may create hotspots

339

340

### Round-Robin Partitioning

341

342

- Used when no custom partitioner is provided and no keys are set

343

- Distributes records evenly across partitions

344

- Good for load balancing without keys

345

346

### Key-Based Partitioning

347

348

- Used with KeyedSerializationSchema when keys are provided

349

- Records with the same key go to the same partition

350

- Maintains ordering for records with the same key

351

352

### Custom Partitioning

353

354

- Implement FlinkKafkaPartitioner interface

355

- Full control over partition assignment logic

356

- Can consider record content, metadata, or external factors

357

358

## Error Handling and Limitations

359

360

**Kafka 0.9 Limitations:**

361

- No built-in exactly-once delivery guarantees

362

- Limited idempotent producer support

363

- No transactional API

364

365

**Common Issues:**

366

- `SerializationException`: Issues with custom serialization schemas

367

- `TimeoutException`: Network connectivity or broker availability

368

- `RecordTooLargeException`: Message exceeds broker limits

369

- `InvalidTopicException`: Topic doesn't exist or invalid name

370

371

**Best Practices:**

372

- Configure appropriate timeouts and retries

373

- Monitor producer metrics and logs

374

- Use compression for better throughput

375

- Consider batch size and linger time for latency vs throughput