0
# Kafka Producer
1
2
The Flink Kafka producer enables reliable message production to Kafka 0.8.x topics with configurable partitioning and serialization strategies.
3
4
## Capabilities
5
6
### FlinkKafkaProducer08
7
8
The primary producer class for Kafka 0.8.x integration with extensive configuration options.
9
10
```java { .api }
11
/**
12
* Kafka producer for Apache Kafka 0.8.x (provides best-effort delivery guarantees)
13
*/
14
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
15
16
/**
17
* Creates producer with broker list, topic, and value-only serialization
18
* @param brokerList Comma-separated list of Kafka brokers
19
* @param topicId Target Kafka topic name
20
* @param serializationSchema Schema for serializing values
21
*/
22
public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
23
24
/**
25
* Creates producer with properties and value-only serialization
26
* @param topicId Target Kafka topic name
27
* @param serializationSchema Schema for serializing values
28
* @param producerConfig Kafka producer properties
29
*/
30
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
31
32
/**
33
* Creates producer with properties, value serialization, and custom partitioner
34
* @param topicId Target Kafka topic name
35
* @param serializationSchema Schema for serializing values
36
* @param producerConfig Kafka producer properties
37
* @param customPartitioner Custom partitioner for message distribution
38
*/
39
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
40
41
/**
42
* Creates producer with broker list, topic, and key-value serialization
43
* @param brokerList Comma-separated list of Kafka brokers
44
* @param topicId Target Kafka topic name
45
* @param serializationSchema Schema for serializing keys and values
46
*/
47
public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
48
49
/**
50
* Creates producer with properties and key-value serialization
51
* @param topicId Target Kafka topic name
52
* @param serializationSchema Schema for serializing keys and values
53
* @param producerConfig Kafka producer properties
54
*/
55
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
56
57
/**
58
* Creates producer with properties, key-value serialization, and custom partitioner
59
* @param topicId Target Kafka topic name
60
* @param serializationSchema Schema for serializing keys and values
61
* @param producerConfig Kafka producer properties
62
* @param customPartitioner Custom partitioner for message distribution
63
*/
64
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
65
66
/**
67
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
68
*/
69
@Deprecated
70
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
71
72
/**
73
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
74
*/
75
@Deprecated
76
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
77
}
78
```
79
80
**Usage Examples:**
81
82
```java
83
import org.apache.flink.streaming.api.datastream.DataStream;
84
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
85
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
86
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
87
88
import java.util.Properties;
89
90
// Basic producer with broker list
91
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
92
"localhost:9092",
93
"output-topic",
94
new SimpleStringSchema()
95
);
96
97
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
98
stream.addSink(producer);
99
100
// Producer with properties configuration
101
Properties producerProps = new Properties();
102
producerProps.setProperty("bootstrap.servers", "localhost:9092");
103
producerProps.setProperty("batch.size", "16384");
104
producerProps.setProperty("linger.ms", "10");
105
producerProps.setProperty("compression.type", "snappy");
106
107
FlinkKafkaProducer08<String> configuredProducer = new FlinkKafkaProducer08<>(
108
"events-topic",
109
new SimpleStringSchema(),
110
producerProps
111
);
112
113
stream.addSink(configuredProducer);
114
115
// Producer with custom partitioner
116
FlinkKafkaProducer08<String> partitionedProducer = new FlinkKafkaProducer08<>(
117
"partitioned-topic",
118
new SimpleStringSchema(),
119
producerProps,
120
new CustomPartitioner<>() // implement FlinkKafkaPartitioner
121
);
122
123
// Key-value producer
124
KeyedSerializationSchema<Tuple2<String, String>> keyValueSchema =
125
new KeyedSerializationSchemaWrapper<>(
126
new SimpleStringSchema(), // key serializer
127
new SimpleStringSchema() // value serializer
128
);
129
130
FlinkKafkaProducer08<Tuple2<String, String>> kvProducer = new FlinkKafkaProducer08<>(
131
"key-value-topic",
132
keyValueSchema,
133
producerProps
134
);
135
136
DataStream<Tuple2<String, String>> kvStream = env.fromElements(
137
Tuple2.of("key1", "value1"),
138
Tuple2.of("key2", "value2")
139
);
140
kvStream.addSink(kvProducer);
141
```
142
143
### FlinkKafkaProducer (Deprecated)
144
145
Deprecated alias that redirects to FlinkKafkaProducer08.
146
147
```java { .api }
148
/**
149
* @deprecated Use FlinkKafkaProducer08 instead
150
*/
151
@Deprecated
152
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
153
/**
154
* @deprecated Use FlinkKafkaProducer08 constructor instead
155
*/
156
@Deprecated
157
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
158
159
/**
160
* @deprecated Use FlinkKafkaProducer08 constructor instead
161
*/
162
@Deprecated
163
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
164
165
/**
166
* @deprecated Use FlinkKafkaProducer08 constructor instead
167
*/
168
@Deprecated
169
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
170
171
/**
172
* @deprecated Use FlinkKafkaProducer08 constructor instead
173
*/
174
@Deprecated
175
public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
176
177
/**
178
* @deprecated Use FlinkKafkaProducer08 constructor instead
179
*/
180
@Deprecated
181
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
182
183
/**
184
* @deprecated Use FlinkKafkaProducer08 constructor instead
185
*/
186
@Deprecated
187
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
188
}
189
```
190
191
## Configuration
192
193
### Required Properties
194
195
- **bootstrap.servers**: Comma-separated list of Kafka broker addresses
196
197
### Recommended Properties
198
199
- **batch.size**: Number of bytes to batch before sending (default: 16384)
200
- **linger.ms**: Time to wait for additional messages in batch (default: 0)
201
- **compression.type**: Compression algorithm (`none`, `gzip`, `snappy`, `lz4`)
202
- **acks**: Acknowledgment mode (`0`, `1`, `all`)
203
- **retries**: Number of retry attempts on failure
204
- **retry.backoff.ms**: Backoff time between retries
205
206
### Advanced Properties
207
208
- **buffer.memory**: Total memory for producer buffering
209
- **max.block.ms**: Maximum time to block on send
210
- **request.timeout.ms**: Request timeout duration
211
- **delivery.timeout.ms**: Total time for message delivery
212
213
## Serialization Schemas
214
215
The producer supports various serialization schemas:
216
217
```java
218
// Simple string serialization
219
SerializationSchema<String> stringSchema = new SimpleStringSchema();
220
221
// JSON serialization
222
SerializationSchema<MyObject> jsonSchema = new JSONSerializationSchema<>();
223
224
// Avro serialization (with schema registry)
225
SerializationSchema<MyAvroRecord> avroSchema = new AvroSerializationSchema<>(MyAvroRecord.class);
226
227
// Custom serialization
228
SerializationSchema<MyCustomType> customSchema = new SerializationSchema<MyCustomType>() {
229
@Override
230
public byte[] serialize(MyCustomType element) {
231
// Custom serialization logic
232
return element.toBytes();
233
}
234
};
235
```
236
237
## Custom Partitioning
238
239
Implement custom partitioning logic:
240
241
```java
242
public class MyCustomPartitioner implements FlinkKafkaPartitioner<String> {
243
244
@Override
245
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
246
// Custom partitioning logic
247
if (record.startsWith("urgent")) {
248
return 0; // Route urgent messages to partition 0
249
}
250
return record.hashCode() % partitions.length;
251
}
252
}
253
254
// Use custom partitioner
255
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
256
"my-topic",
257
new SimpleStringSchema(),
258
props,
259
new MyCustomPartitioner()
260
);
261
```
262
263
## Error Handling
264
265
The producer handles errors according to Kafka 0.8.x limitations:
266
267
- **Best-effort delivery**: No exactly-once guarantees with Kafka 0.8.x
268
- **Retry behavior**: Configurable through Kafka producer properties
269
- **Failure modes**: Messages may be lost on producer failures
270
271
```java
272
// Configure retry behavior
273
Properties props = new Properties();
274
props.setProperty("bootstrap.servers", "localhost:9092");
275
props.setProperty("retries", "3");
276
props.setProperty("retry.backoff.ms", "1000");
277
props.setProperty("acks", "1"); // Wait for leader acknowledgment
278
279
// Error handling in application
280
try {
281
stream.addSink(producer);
282
env.execute("Kafka Producer Job");
283
} catch (Exception e) {
284
logger.error("Kafka producer failed", e);
285
// Implement fallback or recovery logic
286
}
287
```
288
289
## Performance Considerations
290
291
- **Batching**: Use appropriate `batch.size` and `linger.ms` for throughput
292
- **Compression**: Enable compression for network efficiency
293
- **Partitioning**: Distribute load evenly across partitions
294
- **Parallelism**: Match producer parallelism to topic partition count
295
- **Memory**: Configure `buffer.memory` based on throughput requirements