0
# Kafka Producer API
1
2
Comprehensive API reference for Kafka 0.8 producer classes in the Apache Flink Kafka Connector.
3
4
## FlinkKafkaProducer08<IN>
5
6
**Package:** `org.apache.flink.streaming.connectors.kafka`
7
**Annotations:** `@PublicEvolving`
8
**Extends:** `FlinkKafkaProducerBase<IN>`
9
**Description:** Flink sink to produce data into a Kafka topic. Compatible with Kafka 0.8.x. **Important Note**: This producer does not have reliability guarantees and may lose messages on failures.
10
11
### Class Declaration
12
13
```java { .api }
14
@PublicEvolving
15
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>
16
```
17
18
### Constants
19
20
```java { .api }
21
private static final long serialVersionUID = 1L;
22
```
23
24
### Constructors
25
26
#### Key-less Serialization Constructors
27
28
```java { .api }
29
/**
30
* Creates a FlinkKafkaProducer08 using a broker list and SerializationSchema.
31
*
32
* @param brokerList Comma separated list of Kafka brokers (e.g., "localhost:9092,broker2:9092")
33
* @param topicId Target Kafka topic name
34
* @param serializationSchema Schema to serialize objects to byte arrays
35
*/
36
public FlinkKafkaProducer08(String brokerList,
37
String topicId,
38
SerializationSchema<IN> serializationSchema)
39
40
/**
41
* Creates a FlinkKafkaProducer08 using Properties and SerializationSchema.
42
*
43
* @param topicId Target Kafka topic name
44
* @param serializationSchema Schema to serialize objects to byte arrays
45
* @param producerConfig Properties containing Kafka producer configuration
46
*/
47
public FlinkKafkaProducer08(String topicId,
48
SerializationSchema<IN> serializationSchema,
49
Properties producerConfig)
50
51
/**
52
* Creates a FlinkKafkaProducer08 with custom partitioner and SerializationSchema.
53
*
54
* @param topicId Target Kafka topic name
55
* @param serializationSchema Schema to serialize objects to byte arrays
56
* @param producerConfig Properties containing Kafka producer configuration
57
* @param customPartitioner Custom partitioner for determining target partition (can be null)
58
*/
59
public FlinkKafkaProducer08(String topicId,
60
SerializationSchema<IN> serializationSchema,
61
Properties producerConfig,
62
@Nullable FlinkKafkaPartitioner<IN> customPartitioner)
63
```
64
65
#### Key/Value Serialization Constructors
66
67
```java { .api }
68
/**
69
* Creates a FlinkKafkaProducer08 using broker list and KeyedSerializationSchema.
70
*
71
* @param brokerList Comma separated list of Kafka brokers
72
* @param topicId Target Kafka topic name
73
* @param serializationSchema Schema to serialize objects to key/value byte arrays
74
*/
75
public FlinkKafkaProducer08(String brokerList,
76
String topicId,
77
KeyedSerializationSchema<IN> serializationSchema)
78
79
/**
80
* Creates a FlinkKafkaProducer08 using Properties and KeyedSerializationSchema.
81
*
82
* @param topicId Target Kafka topic name
83
* @param serializationSchema Schema to serialize objects to key/value byte arrays
84
* @param producerConfig Properties containing Kafka producer configuration
85
*/
86
public FlinkKafkaProducer08(String topicId,
87
KeyedSerializationSchema<IN> serializationSchema,
88
Properties producerConfig)
89
90
/**
91
* Creates a FlinkKafkaProducer08 with custom partitioner and KeyedSerializationSchema.
92
*
93
* @param topicId Target Kafka topic name
94
* @param serializationSchema Schema to serialize objects to key/value byte arrays
95
* @param producerConfig Properties containing Kafka producer configuration
96
* @param customPartitioner Custom partitioner for determining target partition (can be null)
97
*/
98
public FlinkKafkaProducer08(String topicId,
99
KeyedSerializationSchema<IN> serializationSchema,
100
Properties producerConfig,
101
@Nullable FlinkKafkaPartitioner<IN> customPartitioner)
102
```
103
104
#### Deprecated Constructors
105
106
```java { .api }
107
/**
108
* @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner
109
*/
110
@Deprecated
111
public FlinkKafkaProducer08(String topicId,
112
SerializationSchema<IN> serializationSchema,
113
Properties producerConfig,
114
KafkaPartitioner<IN> customPartitioner)
115
116
/**
117
* @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner
118
*/
119
@Deprecated
120
public FlinkKafkaProducer08(String topicId,
121
KeyedSerializationSchema<IN> serializationSchema,
122
Properties producerConfig,
123
KafkaPartitioner<IN> customPartitioner)
124
```
125
126
### Configuration Methods (Inherited from FlinkKafkaProducerBase)
127
128
```java { .api }
129
/**
130
* Configures whether the producer should only log failures instead of throwing exceptions.
131
* Default is false (failures cause exceptions).
132
*
133
* @param logFailuresOnly If true, only log failures; if false, throw exceptions on failures
134
*/
135
public void setLogFailuresOnly(boolean logFailuresOnly)
136
137
/**
138
* Configures whether the producer should flush data on checkpoint operations.
139
* This can improve reliability at the cost of performance.
140
*
141
* @param flush If true, flush on checkpoints; if false, don't flush
142
*/
143
public void setFlushOnCheckpoint(boolean flush)
144
```
145
146
### Lifecycle Methods (Inherited from FlinkKafkaProducerBase)
147
148
```java { .api }
149
/**
150
* Opens the producer and initializes resources.
151
*
152
* @param configuration The task configuration
153
*/
154
public void open(Configuration configuration)
155
156
/**
157
* Sends a record to Kafka.
158
*
159
* @param next The record to send
160
* @param context The sink context (can be used for side outputs)
161
* @throws Exception If sending fails
162
*/
163
public void invoke(IN next, Context context) throws Exception
164
165
/**
166
* Closes the producer and cleans up resources.
167
*
168
* @throws Exception If cleanup fails
169
*/
170
public void close() throws Exception
171
```
172
173
### Checkpointing Interface Methods (Inherited)
174
175
```java { .api }
176
/**
177
* Initializes the state of the function from a checkpoint.
178
*
179
* @param context The initialization context
180
* @throws Exception If state initialization fails
181
*/
182
public void initializeState(FunctionInitializationContext context) throws Exception
183
184
/**
185
* Snapshots the function's state during checkpointing.
186
*
187
* @param ctx The snapshot context
188
* @throws Exception If state snapshotting fails
189
*/
190
public void snapshotState(FunctionSnapshotContext ctx) throws Exception
191
```
192
193
### Static Utility Methods (Inherited)
194
195
```java { .api }
196
/**
197
* Converts a comma-separated broker list to Properties.
198
*
199
* @param brokerList Comma-separated list of brokers (e.g., "broker1:9092,broker2:9092")
200
* @return Properties object with "metadata.broker.list" set
201
*/
202
public static Properties getPropertiesFromBrokerList(String brokerList)
203
```
204
205
### Protected Methods
206
207
```java { .api }
208
/**
209
* Kafka 0.8 specific flush implementation (no-op since Kafka 0.8 doesn't support flushing).
210
*/
211
protected void flush()
212
```
213
214
## FlinkKafkaPartitioner<T> Interface
215
216
**Package:** `org.apache.flink.streaming.connectors.kafka.partitioner`
217
**Description:** Interface for custom partitioning strategies in Flink Kafka producers.
218
219
```java { .api }
220
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
221
222
/**
223
* Returns the partition ID for a given record.
224
*
225
* @param record The record to partition
226
* @param key The serialized key (can be null)
227
* @param value The serialized value
228
* @param targetTopic The target topic name
229
* @param partitions Array of available partition IDs
230
* @return The partition ID (must be within partitions array bounds)
231
*/
232
public abstract int partition(T record, byte[] key, byte[] value,
233
String targetTopic, int[] partitions);
234
235
/**
236
* Called when the partitioner is opened. Override for initialization logic.
237
*
238
* @param parallelInstanceId The parallel instance ID of this subtask
239
* @param parallelInstances The total number of parallel instances
240
*/
241
public void open(int parallelInstanceId, int parallelInstances) {
242
// Default implementation is empty
243
}
244
}
245
```
246
247
### Custom Partitioner Examples
248
249
```java { .api }
250
// Hash-based partitioning by customer ID
251
public class CustomerHashPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {
252
253
@Override
254
public int partition(CustomerEvent record, byte[] key, byte[] value,
255
String targetTopic, int[] partitions) {
256
int customerId = record.getCustomerId();
257
return Math.abs(customerId % partitions.length);
258
}
259
}
260
261
// Round-robin partitioning
262
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
263
private int counter = 0;
264
265
@Override
266
public int partition(T record, byte[] key, byte[] value,
267
String targetTopic, int[] partitions) {
268
return partitions[(counter++) % partitions.length];
269
}
270
}
271
272
// Time-based partitioning
273
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
274
275
@Override
276
public int partition(TimestampedEvent record, byte[] key, byte[] value,
277
String targetTopic, int[] partitions) {
278
// Partition based on hour of day
279
long timestamp = record.getTimestamp();
280
int hour = (int) ((timestamp / 3600000) % 24);
281
return partitions[hour % partitions.length];
282
}
283
}
284
```
285
286
## Usage Examples
287
288
### Basic Producer Setup
289
290
```java { .api }
291
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
292
import org.apache.flink.streaming.api.datastream.DataStream;
293
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
294
import org.apache.flink.api.common.serialization.SimpleStringSchema;
295
296
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
297
298
Properties properties = new Properties();
299
properties.setProperty("metadata.broker.list", "localhost:9092");
300
301
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
302
"my-topic",
303
new SimpleStringSchema(),
304
properties
305
);
306
307
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
308
stream.addSink(producer);
309
310
env.execute("Kafka Producer Job");
311
```
312
313
### Producer with Custom Partitioner
314
315
```java { .api }
316
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
317
318
// Custom partitioner implementation
319
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
320
@Override
321
public int partition(MyEvent record, byte[] key, byte[] value,
322
String targetTopic, int[] partitions) {
323
// Partition based on event type
324
return Math.abs(record.getEventType().hashCode() % partitions.length);
325
}
326
};
327
328
FlinkKafkaProducer08<MyEvent> producer = new FlinkKafkaProducer08<>(
329
"events-topic",
330
new MyEventSerializer(),
331
properties,
332
partitioner
333
);
334
335
// Configure for reliability
336
producer.setLogFailuresOnly(false); // Fail on errors
337
producer.setFlushOnCheckpoint(true); // Flush on checkpoints
338
339
DataStream<MyEvent> events = // ... your event stream
340
events.addSink(producer);
341
```
342
343
### Producer with Key/Value Serialization
344
345
```java { .api }
346
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
347
348
KeyedSerializationSchema<CustomerOrder> schema = new KeyedSerializationSchema<CustomerOrder>() {
349
@Override
350
public byte[] serializeKey(CustomerOrder element) {
351
// Use customer ID as key for partitioning
352
return String.valueOf(element.getCustomerId()).getBytes();
353
}
354
355
@Override
356
public byte[] serializeValue(CustomerOrder element) {
357
// Serialize order details as JSON
358
return element.toJson().getBytes();
359
}
360
361
@Override
362
public String getTargetTopic(CustomerOrder element) {
363
// Dynamic topic selection based on order type
364
return "orders-" + element.getOrderType().toLowerCase();
365
}
366
};
367
368
FlinkKafkaProducer08<CustomerOrder> producer = new FlinkKafkaProducer08<>(
369
null, // topic will be determined by schema.getTargetTopic()
370
schema,
371
properties
372
);
373
374
DataStream<CustomerOrder> orders = // ... your order stream
375
orders.addSink(producer);
376
```
377
378
### Producer Configuration Examples
379
380
```java { .api }
381
Properties producerProps = new Properties();
382
383
// Required: Broker connection
384
producerProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");
385
386
// Performance tuning
387
producerProps.setProperty("batch.num.messages", "200");
388
producerProps.setProperty("queue.buffering.max.ms", "1000");
389
producerProps.setProperty("queue.buffering.max.messages", "10000");
390
391
// Compression (optional)
392
producerProps.setProperty("compression.codec", "snappy"); // or "gzip", "lz4"
393
394
// Network settings
395
producerProps.setProperty("send.buffer.bytes", "102400");
396
producerProps.setProperty("client.id", "flink-kafka-producer");
397
398
// Reliability settings (Kafka 0.8 limitations apply)
399
producerProps.setProperty("request.required.acks", "1"); // 0, 1, or -1
400
producerProps.setProperty("request.timeout.ms", "10000");
401
producerProps.setProperty("message.send.max.retries", "3");
402
producerProps.setProperty("retry.backoff.ms", "100");
403
404
FlinkKafkaProducer08<MyData> producer = new FlinkKafkaProducer08<>(
405
"my-topic", new MyDataSerializer(), producerProps);
406
```
407
408
### Error Handling and Reliability
409
410
```java { .api }
411
// Configure producer for different reliability levels
412
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
413
"my-topic", new SimpleStringSchema(), properties);
414
415
// Option 1: Fail fast on errors (default)
416
producer.setLogFailuresOnly(false);
417
418
// Option 2: Log errors but continue processing
419
producer.setLogFailuresOnly(true);
420
421
// Enable flushing on checkpoints for better reliability
422
producer.setFlushOnCheckpoint(true);
423
424
// Note: Kafka 0.8 producer limitations
425
// - No transactional guarantees
426
// - No exactly-once semantics
427
// - Messages may be lost on producer failures
428
// - No built-in retry mechanism for failed sends
429
```
430
431
### Custom Serialization Examples
432
433
```java { .api }
434
// Simple SerializationSchema example
435
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
436
private final ObjectMapper objectMapper = new ObjectMapper();
437
438
@Override
439
public byte[] serialize(T element) {
440
try {
441
return objectMapper.writeValueAsBytes(element);
442
} catch (Exception e) {
443
throw new RuntimeException("Failed to serialize to JSON", e);
444
}
445
}
446
}
447
448
// KeyedSerializationSchema with dynamic routing
449
public class RoutingKeyedSchema<T> implements KeyedSerializationSchema<T> {
450
451
@Override
452
public byte[] serializeKey(T element) {
453
if (element instanceof Keyed) {
454
return ((Keyed) element).getKey().getBytes();
455
}
456
return null; // No key
457
}
458
459
@Override
460
public byte[] serializeValue(T element) {
461
return element.toString().getBytes();
462
}
463
464
@Override
465
public String getTargetTopic(T element) {
466
if (element instanceof Routable) {
467
return ((Routable) element).getTargetTopic();
468
}
469
return null; // Use default topic
470
}
471
}
472
```
473
474
## Deprecated FlinkKafkaProducer<IN>
475
476
**Package:** `org.apache.flink.streaming.connectors.kafka`
477
**Annotations:** `@Deprecated`
478
**Extends:** `FlinkKafkaProducer08<IN>`
479
480
```java { .api }
481
@Deprecated
482
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>
483
```
484
485
### Deprecated Constructors
486
487
```java { .api }
488
/**
489
* @deprecated Use FlinkKafkaProducer08 instead
490
*/
491
@Deprecated
492
public FlinkKafkaProducer(String brokerList, String topicId,
493
SerializationSchema<IN> serializationSchema)
494
495
/**
496
* @deprecated Use FlinkKafkaProducer08 instead
497
*/
498
@Deprecated
499
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema,
500
Properties producerConfig)
501
502
/**
503
* @deprecated Use FlinkKafkaProducer08 instead
504
*/
505
@Deprecated
506
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema,
507
Properties producerConfig, KafkaPartitioner customPartitioner)
508
509
/**
510
* @deprecated Use FlinkKafkaProducer08 instead
511
*/
512
@Deprecated
513
public FlinkKafkaProducer(String brokerList, String topicId,
514
KeyedSerializationSchema<IN> serializationSchema)
515
516
/**
517
* @deprecated Use FlinkKafkaProducer08 instead
518
*/
519
@Deprecated
520
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema,
521
Properties producerConfig)
522
523
/**
524
* @deprecated Use FlinkKafkaProducer08 instead
525
*/
526
@Deprecated
527
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema,
528
Properties producerConfig, KafkaPartitioner customPartitioner)
529
```
530
531
## FlinkKafkaPartitioner<T> Interface
532
533
**Package:** `org.apache.flink.streaming.connectors.kafka.partitioner`
534
**Annotations:** `@PublicEvolving`
535
**Description:** Abstract base class for custom partitioning logic determining which Kafka partition records should be written to.
536
537
### Interface Definition
538
539
```java { .api }
540
@PublicEvolving
541
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
542
543
/**
544
* Initializer for the partitioner. Called once on each parallel sink instance.
545
*
546
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
547
* @param parallelInstances the total number of parallel instances
548
*/
549
public void open(int parallelInstanceId, int parallelInstances);
550
551
/**
552
* Determine the id of the partition that the record should be written to.
553
*
554
* @param record the record value
555
* @param key serialized key of the record
556
* @param value serialized value of the record
557
* @param targetTopic target topic for the record
558
* @param partitions found partitions for the target topic
559
* @return the id of the target partition
560
*/
561
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
562
}
563
```
564
565
### Custom Partitioner Example
566
567
```java { .api }
568
public class CustomerIdPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {
569
570
@Override
571
public int partition(CustomerEvent record, byte[] key, byte[] value,
572
String targetTopic, int[] partitions) {
573
// Partition based on customer ID to ensure events for same customer go to same partition
574
return Math.abs(record.getCustomerId().hashCode() % partitions.length);
575
}
576
}
577
578
// Usage with producer
579
FlinkKafkaProducer08<CustomerEvent> producer = new FlinkKafkaProducer08<>(
580
"customer-events",
581
new CustomerEventSchema(),
582
properties,
583
new CustomerIdPartitioner()
584
);
585
```
586
587
## Kafka 0.8 Producer Limitations
588
589
### Important Reliability Considerations
590
591
1. **No Exactly-Once Guarantees**: Kafka 0.8 producers cannot provide exactly-once delivery semantics
592
2. **No Transactional Support**: No atomic writes across multiple partitions/topics
593
3. **No Built-in Flushing**: The `flush()` method is a no-op in Kafka 0.8
594
4. **Limited Error Recovery**: Failed messages may be lost without explicit retry logic
595
5. **No Idempotent Writes**: Duplicate messages may occur during retries
596
597
### Recommended Practices
598
599
```java { .api }
600
// Enable checkpointing for better reliability
601
env.enableCheckpointing(60000); // Checkpoint every minute
602
603
// Use flush on checkpoint
604
producer.setFlushOnCheckpoint(true);
605
606
// Configure appropriate retry settings in Kafka properties
607
Properties props = new Properties();
608
props.setProperty("message.send.max.retries", "3");
609
props.setProperty("retry.backoff.ms", "100");
610
props.setProperty("request.required.acks", "1"); // Wait for leader acknowledgment
611
612
// Consider using exactly-once sink if available in newer Flink versions
613
// For Kafka 0.8, implement custom retry logic if needed
614
```
615
616
### Migration Recommendations
617
618
For production systems requiring strong delivery guarantees:
619
620
1. **Upgrade to Kafka 0.9+**: Use `flink-connector-kafka-0.9` or newer for exactly-once semantics
621
2. **Implement Custom Reliability**: Add application-level retry and deduplication logic
622
3. **Monitor Producer Metrics**: Track failed sends and implement alerting
623
4. **Use Synchronous Sending**: If throughput allows, consider synchronous sends for critical data