0
# Producer Base Classes
1
2
Abstract base implementations for Kafka producers that provide exactly-once delivery semantics, serialization handling, and partitioning logic. These classes handle the complexities of reliable message production while delegating version-specific operations to concrete implementations.
3
4
## Capabilities
5
6
### FlinkKafkaProducerBase
7
8
The core abstract base class that all Flink Kafka producers extend. Provides comprehensive functionality for producing to Kafka topics with exactly-once processing guarantees and transaction support.
9
10
```java { .api }
11
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
12
implements CheckpointedFunction {
13
14
public FlinkKafkaProducerBase(
15
String defaultTopicId,
16
KeyedSerializationSchema<IN> serializationSchema,
17
Properties producerConfig,
18
FlinkKafkaPartitioner<IN> customPartitioner
19
);
20
}
21
```
22
23
**Parameters:**
24
- `defaultTopicId` - Default target topic for messages (can be overridden by serialization schema)
25
- `serializationSchema` - Schema for serializing elements to Kafka key-value messages
26
- `producerConfig` - Kafka producer configuration properties
27
- `customPartitioner` - Custom partitioner for determining target partitions (optional, can be null)
28
29
**Usage Example:**
30
31
```java
32
Properties props = new Properties();
33
props.setProperty("bootstrap.servers", "localhost:9092");
34
props.setProperty("transaction.timeout.ms", "900000");
35
36
FlinkKafkaProducerBase<MyEvent> producer = new MyKafkaProducer(
37
"events-topic",
38
new MyEventSerializationSchema(),
39
props,
40
new FlinkFixedPartitioner<>()
41
);
42
```
43
44
### Error Handling Configuration
45
46
Configure how the producer handles failures during message production.
47
48
```java { .api }
49
public void setLogFailuresOnly(boolean logFailuresOnly);
50
```
51
52
**Parameters:**
53
- `logFailuresOnly` - If true, failures are only logged (not thrown). If false, failures cause job failure.
54
55
**Usage Example:**
56
57
```java
58
// Log failures but continue processing (not recommended for production)
59
producer.setLogFailuresOnly(true);
60
61
// Fail job on any production failure (recommended for exactly-once)
62
producer.setLogFailuresOnly(false);
63
```
64
65
### Checkpoint Flush Behavior
66
67
Control whether messages are flushed synchronously during checkpoints for exactly-once guarantees.
68
69
```java { .api }
70
public void setFlushOnCheckpoint(boolean flush);
71
```
72
73
**Parameters:**
74
- `flush` - If true, all pending messages are flushed during checkpoints (required for exactly-once)
75
76
**Usage Example:**
77
78
```java
79
// Enable checkpoint flushing for exactly-once guarantees
80
producer.setFlushOnCheckpoint(true);
81
```
82
83
### Utility Methods
84
85
Static utility methods for common configuration tasks.
86
87
```java { .api }
88
public static Properties getPropertiesFromBrokerList(String brokerList);
89
```
90
91
**Parameters:**
92
- `brokerList` - Comma-separated list of Kafka brokers (host:port format)
93
94
**Returns:** Properties object with bootstrap.servers configured
95
96
**Usage Example:**
97
98
```java
99
Properties props = FlinkKafkaProducerBase.getPropertiesFromBrokerList("broker1:9092,broker2:9092");
100
// Additional properties can be set on the returned Properties object
101
props.setProperty("transaction.timeout.ms", "900000");
102
```
103
104
### Message Production
105
106
Core method for sending messages to Kafka (called by Flink runtime).
107
108
```java { .api }
109
public void invoke(IN next, Context context) throws Exception;
110
```
111
112
**Parameters:**
113
- `next` - The record to be sent to Kafka
114
- `context` - Sink context providing additional information
115
116
This method is called by the Flink runtime for each record and should not be called directly by user code.
117
118
### State Management
119
120
Handle checkpointing and transaction coordination (implemented by the framework).
121
122
```java { .api }
123
public void initializeState(FunctionInitializationContext context) throws Exception;
124
public void snapshotState(FunctionSnapshotContext context) throws Exception;
125
```
126
127
These methods handle the exactly-once semantics by coordinating with Kafka transactions and Flink checkpoints.
128
129
### Resource Management
130
131
```java { .api }
132
public void open(Configuration configuration);
133
public void close() throws Exception;
134
```
135
136
These methods handle producer lifecycle management including Kafka client initialization and cleanup.
137
138
### Abstract Methods
139
140
Concrete implementations must implement this version-specific method:
141
142
```java { .api }
143
protected abstract void flush();
144
```
145
146
This method must flush all pending records to ensure they are sent to Kafka. Called during checkpoints to guarantee exactly-once processing when `setFlushOnCheckpoint(true)` is configured.
147
148
### Protected API for Subclasses
149
150
Methods and fields available to concrete implementations:
151
152
```java { .api }
153
protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props);
154
protected void checkErroneous() throws Exception;
155
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer);
156
protected long numPendingRecords();
157
```
158
159
**Protected Fields:**
160
```java { .api }
161
protected final Properties producerConfig;
162
protected final String defaultTopicId;
163
protected final KeyedSerializationSchema<IN> schema;
164
protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
165
protected boolean logFailuresOnly;
166
protected boolean flushOnCheckpoint;
167
```
168
169
**Methods:**
170
- `getKafkaProducer()` - Factory method for creating Kafka producer instances
171
- `checkErroneous()` - Check for and throw any pending async exceptions
172
- `getPartitionsByTopic()` - Utility to discover partitions for a topic
173
- `numPendingRecords()` - Get count of unacknowledged records (useful for monitoring)
174
175
**Constants:**
176
```java { .api }
177
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
178
```
179
180
## Configuration Best Practices
181
182
### Exactly-Once Configuration
183
184
For exactly-once processing guarantees, configure the producer as follows:
185
186
```java
187
Properties props = new Properties();
188
props.setProperty("bootstrap.servers", "localhost:9092");
189
props.setProperty("transaction.timeout.ms", "900000");
190
props.setProperty("max.in.flight.requests.per.connection", "1");
191
props.setProperty("retries", "2147483647");
192
props.setProperty("enable.idempotence", "true");
193
194
producer.setFlushOnCheckpoint(true);
195
producer.setLogFailuresOnly(false);
196
```
197
198
### At-Least-Once Configuration
199
200
For at-least-once processing with higher throughput:
201
202
```java
203
Properties props = new Properties();
204
props.setProperty("bootstrap.servers", "localhost:9092");
205
props.setProperty("acks", "all");
206
props.setProperty("retries", "2147483647");
207
208
producer.setFlushOnCheckpoint(false);
209
producer.setLogFailuresOnly(false);
210
```
211
212
### High-Throughput Configuration
213
214
For maximum throughput with at-least-once guarantees:
215
216
```java
217
Properties props = new Properties();
218
props.setProperty("bootstrap.servers", "localhost:9092");
219
props.setProperty("acks", "1");
220
props.setProperty("batch.size", "16384");
221
props.setProperty("linger.ms", "5");
222
props.setProperty("compression.type", "lz4");
223
224
producer.setFlushOnCheckpoint(false);
225
producer.setLogFailuresOnly(true);
226
```
227
228
## Error Handling
229
230
The producer handles various types of failures:
231
232
- **Retriable Errors**: Automatically retried based on `retries` configuration
233
- **Non-Retriable Errors**: Cause immediate failure or logging based on `logFailuresOnly` setting
234
- **Transaction Errors**: Handled through checkpoint coordination and transaction abort/retry
235
- **Network Errors**: Handled through connection retry and failover mechanisms
236
237
When `logFailuresOnly` is false (recommended), any production failure will cause the Flink job to fail and restart, ensuring no data loss with proper checkpoint configuration.