0
# Data Production
1
2
Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees for high-throughput data pipelines.
3
4
## Capabilities
5
6
### FlinkKafkaProducer09 Class
7
8
Main Kafka producer class for writing data from Flink data streams to Kafka 0.9.x topics.
9
10
```java { .api }
11
/**
12
* Kafka producer for writing data to Apache Kafka 0.9.x topics.
13
* Compatible with Kafka 0.9 without reliability guarantees.
14
*
15
* @param <IN> The type of records to write to Kafka
16
*/
17
@PublicEvolving
18
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
19
20
/**
21
* Creates a producer with broker list and key-less serialization using fixed partitioner.
22
*
23
* @param brokerList Comma separated addresses of Kafka brokers
24
* @param topicId ID of the Kafka topic to write to
25
* @param serializationSchema User defined key-less serialization schema
26
*/
27
public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
28
29
/**
30
* Creates a producer with properties and key-less serialization using fixed partitioner.
31
*
32
* @param topicId ID of the Kafka topic to write to
33
* @param serializationSchema User defined key-less serialization schema
34
* @param producerConfig Properties with the producer configuration
35
*/
36
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
37
38
/**
39
* Creates a producer with key-less serialization and custom partitioner.
40
*
41
* @param topicId The topic to write data to
42
* @param serializationSchema A key-less serializable serialization schema
43
* @param producerConfig Configuration properties for the KafkaProducer
44
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for round-robin
45
*/
46
public FlinkKafkaProducer09(
47
String topicId,
48
SerializationSchema<IN> serializationSchema,
49
Properties producerConfig,
50
FlinkKafkaPartitioner<IN> customPartitioner);
51
52
/**
53
* Creates a producer with broker list and key/value serialization using fixed partitioner.
54
*
55
* @param brokerList Comma separated addresses of Kafka brokers
56
* @param topicId ID of the Kafka topic to write to
57
* @param serializationSchema User defined serialization schema supporting key/value messages
58
*/
59
public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
60
61
/**
62
* Creates a producer with properties and key/value serialization using fixed partitioner.
63
*
64
* @param topicId ID of the Kafka topic to write to
65
* @param serializationSchema User defined serialization schema supporting key/value messages
66
* @param producerConfig Properties with the producer configuration
67
*/
68
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
69
70
/**
71
* Creates a producer with key/value serialization and custom partitioner.
72
*
73
* @param topicId The topic to write data to
74
* @param serializationSchema A serializable serialization schema for key/value messages
75
* @param producerConfig Configuration properties for the KafkaProducer
76
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for key-based partitioning
77
*/
78
public FlinkKafkaProducer09(
79
String topicId,
80
KeyedSerializationSchema<IN> serializationSchema,
81
Properties producerConfig,
82
FlinkKafkaPartitioner<IN> customPartitioner);
83
84
/**
85
* Sets whether the producer should only log failures instead of throwing exceptions.
86
*
87
* @param logFailuresOnly True to only log failures instead of throwing exceptions
88
*/
89
public void setLogFailuresOnly(boolean logFailuresOnly);
90
91
/**
92
* Sets whether the producer should flush pending records on checkpoint.
93
*
94
* @param flush True to flush on checkpoint, false otherwise
95
*/
96
public void setFlushOnCheckpoint(boolean flush);
97
}
98
```
99
100
### Deprecated Constructors
101
102
```java { .api }
103
/**
104
* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
105
* Use FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner) instead.
106
*/
107
@Deprecated
108
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
109
110
/**
111
* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
112
* Use FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) instead.
113
*/
114
@Deprecated
115
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
116
```
117
118
## Usage Examples
119
120
### Basic String Production
121
122
```java
123
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
124
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
125
import org.apache.flink.api.common.serialization.SimpleStringSchema;
126
import java.util.Properties;
127
128
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
129
130
Properties properties = new Properties();
131
properties.setProperty("bootstrap.servers", "localhost:9092");
132
133
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
134
"output-topic",
135
new SimpleStringSchema(),
136
properties
137
);
138
139
env.fromElements("Hello", "World", "Kafka")
140
.addSink(producer);
141
142
env.execute("Basic Kafka Producer");
143
```
144
145
### Custom Object Serialization
146
147
```java
148
import org.apache.flink.api.common.serialization.SerializationSchema;
149
import com.fasterxml.jackson.databind.ObjectMapper;
150
151
// Custom POJO
152
public class User {
153
public String name;
154
public int age;
155
public String email;
156
157
// constructors, getters, setters...
158
}
159
160
// JSON serialization schema
161
SerializationSchema<User> jsonSchema = new SerializationSchema<User>() {
162
private final ObjectMapper mapper = new ObjectMapper();
163
164
@Override
165
public byte[] serialize(User user) {
166
try {
167
return mapper.writeValueAsBytes(user);
168
} catch (Exception e) {
169
throw new RuntimeException("Failed to serialize user", e);
170
}
171
}
172
};
173
174
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
175
"user-events",
176
jsonSchema,
177
properties
178
);
179
180
env.fromElements(
181
new User("Alice", 25, "alice@example.com"),
182
new User("Bob", 30, "bob@example.com")
183
).addSink(producer);
184
```
185
186
### Key/Value Production
187
188
```java
189
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
190
191
// Custom keyed serialization for key/value messages
192
KeyedSerializationSchema<User> keyedSchema = new KeyedSerializationSchema<User>() {
193
private final ObjectMapper mapper = new ObjectMapper();
194
195
@Override
196
public byte[] serializeKey(User user) {
197
return user.email.getBytes(); // Use email as key
198
}
199
200
@Override
201
public byte[] serializeValue(User user) {
202
try {
203
return mapper.writeValueAsBytes(user);
204
} catch (Exception e) {
205
throw new RuntimeException("Failed to serialize user", e);
206
}
207
}
208
209
@Override
210
public String getTargetTopic(User user) {
211
return null; // Use default topic
212
}
213
};
214
215
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
216
"keyed-users",
217
keyedSchema,
218
properties
219
);
220
```
221
222
### Custom Partitioning
223
224
```java
225
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
226
227
// Custom partitioner based on user age
228
FlinkKafkaPartitioner<User> agePartitioner = new FlinkKafkaPartitioner<User>() {
229
@Override
230
public int partition(User record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
231
// Partition by age groups: 0-29, 30-49, 50+
232
int ageGroup = record.age < 30 ? 0 : (record.age < 50 ? 1 : 2);
233
return partitions[ageGroup % partitions.length];
234
}
235
};
236
237
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
238
"age-partitioned-users",
239
jsonSchema,
240
properties,
241
agePartitioner
242
);
243
```
244
245
### Advanced Producer Configuration
246
247
```java
248
Properties properties = new Properties();
249
// Required settings
250
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");
251
252
// Performance tuning
253
properties.setProperty("batch.size", "16384");
254
properties.setProperty("linger.ms", "5");
255
properties.setProperty("compression.type", "snappy");
256
257
// Reliability settings (limited in Kafka 0.9)
258
properties.setProperty("acks", "1"); // 0=no ack, 1=leader ack, all=all replicas
259
properties.setProperty("retries", "3");
260
properties.setProperty("max.in.flight.requests.per.connection", "5");
261
262
// Buffer management
263
properties.setProperty("buffer.memory", "33554432");
264
properties.setProperty("max.block.ms", "60000");
265
266
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
267
"configured-topic",
268
new SimpleStringSchema(),
269
properties
270
);
271
```
272
273
### Dynamic Topic Selection
274
275
```java
276
// Schema that can route to different topics based on record content
277
KeyedSerializationSchema<String> dynamicSchema = new KeyedSerializationSchema<String>() {
278
@Override
279
public byte[] serializeKey(String record) {
280
return null; // No key
281
}
282
283
@Override
284
public byte[] serializeValue(String record) {
285
return record.getBytes();
286
}
287
288
@Override
289
public String getTargetTopic(String record) {
290
// Route based on record content
291
if (record.startsWith("ERROR")) {
292
return "error-logs";
293
} else if (record.startsWith("WARN")) {
294
return "warning-logs";
295
} else {
296
return "info-logs";
297
}
298
}
299
};
300
301
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
302
"default-topic", // Fallback topic
303
dynamicSchema,
304
properties
305
);
306
```
307
308
### Producer with Data Stream Transformations
309
310
```java
311
import org.apache.flink.streaming.api.datastream.DataStream;
312
313
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
314
315
// Create input stream
316
DataStream<String> inputStream = env.socketTextStream("localhost", 9999);
317
318
// Transform and produce to Kafka
319
inputStream
320
.filter(line -> !line.isEmpty())
321
.map(line -> line.toUpperCase())
322
.map(line -> "Processed at " + System.currentTimeMillis() + ": " + line)
323
.addSink(new FlinkKafkaProducer09<>(
324
"processed-data",
325
new SimpleStringSchema(),
326
properties
327
));
328
329
env.execute("Stream Processing to Kafka");
330
```
331
332
## Partitioning Strategies
333
334
### Default Fixed Partitioner
335
336
- Maps each sink subtask to a single Kafka partition
337
- All records from one subtask go to the same partition
338
- Provides good parallelism but may create hotspots
339
340
### Round-Robin Partitioning
341
342
- Used when no custom partitioner is provided and no keys are set
343
- Distributes records evenly across partitions
344
- Good for load balancing without keys
345
346
### Key-Based Partitioning
347
348
- Used with KeyedSerializationSchema when keys are provided
349
- Records with the same key go to the same partition
350
- Maintains ordering for records with the same key
351
352
### Custom Partitioning
353
354
- Implement FlinkKafkaPartitioner interface
355
- Full control over partition assignment logic
356
- Can consider record content, metadata, or external factors
357
358
## Error Handling and Limitations
359
360
**Kafka 0.9 Limitations:**
361
- No built-in exactly-once delivery guarantees
362
- Limited idempotent producer support
363
- No transactional API
364
365
**Common Issues:**
366
- `SerializationException`: Issues with custom serialization schemas
367
- `TimeoutException`: Network connectivity or broker availability
368
- `RecordTooLargeException`: Message exceeds broker limits
369
- `InvalidTopicException`: Topic doesn't exist or invalid name
370
371
**Best Practices:**
372
- Configure appropriate timeouts and retries
373
- Monitor producer metrics and logs
374
- Use compression for better throughput
375
- Consider batch size and linger time for latency vs throughput