0
# Kinesis Producer
1
2
The FlinkKinesisProducer enables high-throughput data ingestion to Amazon Kinesis Data Streams using the Kinesis Producer Library (KPL) with configurable partitioning, error handling, and backpressure management.
3
4
## Capabilities
5
6
### FlinkKinesisProducer
7
8
Main producer class for writing data to Kinesis streams with configurable partitioning strategies, error handling, and performance tuning options.
9
10
```java { .api }
11
@PublicEvolving
12
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>
13
implements CheckpointedFunction {
14
15
// Metric constants
16
public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
17
public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
18
public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
19
public static final String KINESIS_PRODUCER_RELEASE_HOOK_NAME = "kinesisProducer";
20
21
/**
22
* Create producer with standard serialization schema.
23
*
24
* @param schema Standard Flink serialization schema
25
* @param configProps AWS and producer configuration properties
26
*/
27
public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
28
29
/**
30
* Create producer with Kinesis-specific serialization schema.
31
*
32
* @param schema Kinesis serialization schema with target stream specification
33
* @param configProps AWS and producer configuration properties
34
*/
35
public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
36
37
/**
38
* Configure error handling behavior.
39
*
40
* @param failOnError If true, fail the job on any production error; if false, log and continue
41
*/
42
public void setFailOnError(boolean failOnError);
43
44
/**
45
* Set the maximum number of outstanding records before backpressuring.
46
*
47
* @param queueLimit Maximum outstanding records (default: Integer.MAX_VALUE)
48
*/
49
public void setQueueLimit(int queueLimit);
50
51
/**
52
* Set the default target stream for records.
53
*
54
* @param defaultStream Default stream name (can be overridden by serialization schema)
55
*/
56
public void setDefaultStream(String defaultStream);
57
58
/**
59
* Set the default partition for records.
60
*
61
* @param defaultPartition Default partition ID (can be overridden by serialization schema)
62
*/
63
public void setDefaultPartition(String defaultPartition);
64
65
/**
66
* Set custom partitioner for distributing records across shards.
67
*
68
* @param partitioner Custom partitioner implementation
69
*/
70
public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
71
72
/**
73
* Initialize the producer with runtime configuration.
74
*
75
* @param parameters Runtime configuration parameters
76
* @throws Exception On initialization errors
77
*/
78
public void open(Configuration parameters) throws Exception;
79
80
/**
81
* Send a record to Kinesis.
82
*
83
* @param value Record to send
84
* @param context Sink context with additional metadata
85
* @throws Exception On send errors
86
*/
87
public void invoke(OUT value, Context context) throws Exception;
88
89
/**
90
* Close the producer and cleanup resources.
91
*
92
* @throws Exception On cleanup errors
93
*/
94
public void close() throws Exception;
95
96
/**
97
* Initialize state for checkpointing.
98
*
99
* @param context Function initialization context
100
* @throws Exception On initialization errors
101
*/
102
public void initializeState(FunctionInitializationContext context) throws Exception;
103
104
/**
105
* Create snapshot of current state for checkpointing.
106
*
107
* @param context Function snapshot context
108
* @throws Exception On snapshot errors
109
*/
110
public void snapshotState(FunctionSnapshotContext context) throws Exception;
111
}
112
```
113
114
### Usage Examples
115
116
#### Basic Producer Setup
117
118
```java
119
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
120
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
121
import org.apache.flink.api.common.serialization.SimpleStringSchema;
122
import java.util.Properties;
123
124
// Configure AWS properties
125
Properties props = new Properties();
126
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
127
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
128
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
129
130
// Create producer with simple string serialization
131
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
132
new SimpleStringSchema(),
133
props
134
);
135
136
// Configure producer settings
137
producer.setDefaultStream("my-output-stream");
138
producer.setFailOnError(true);
139
producer.setQueueLimit(1000);
140
141
// Add to data stream
142
dataStream.addSink(producer);
143
```
144
145
#### Producer with Custom Serialization
146
147
```java
148
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
149
import java.nio.ByteBuffer;
150
151
// Custom serialization schema with target stream selection
152
KinesisSerializationSchema<MyEvent> customSerializer = new KinesisSerializationSchema<MyEvent>() {
153
@Override
154
public ByteBuffer serialize(MyEvent element) {
155
// Convert to JSON or other format
156
String json = toJson(element);
157
return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
158
}
159
160
@Override
161
public String getTargetStream(MyEvent element) {
162
// Route to different streams based on event type
163
return "events-" + element.getEventType().toLowerCase();
164
}
165
};
166
167
FlinkKinesisProducer<MyEvent> producer = new FlinkKinesisProducer<>(
168
customSerializer,
169
props
170
);
171
```
172
173
#### Producer with Custom Partitioning
174
175
```java
176
import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
177
178
// Custom partitioner for load balancing
179
KinesisPartitioner<MyEvent> customPartitioner = new KinesisPartitioner<MyEvent>() {
180
@Override
181
public String getPartitionId(MyEvent element) {
182
// Partition by user ID for user-based ordering
183
return String.valueOf(element.getUserId() % 100);
184
}
185
186
@Override
187
public String getExplicitHashKey(MyEvent element) {
188
// Optional: provide explicit hash key for finer control
189
return String.valueOf(element.getUserId());
190
}
191
};
192
193
producer.setCustomPartitioner(customPartitioner);
194
```
195
196
#### Producer with Fixed Partitioning
197
198
```java
199
import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner;
200
201
// Use fixed partitioner to ensure each Flink partition maps to same Kinesis partition
202
FixedKinesisPartitioner<MyEvent> fixedPartitioner = new FixedKinesisPartitioner<>();
203
producer.setCustomPartitioner(fixedPartitioner);
204
```
205
206
#### Producer with Random Partitioning
207
208
```java
209
import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner;
210
211
// Use random partitioner for even distribution
212
RandomKinesisPartitioner<MyEvent> randomPartitioner = new RandomKinesisPartitioner<>();
213
producer.setCustomPartitioner(randomPartitioner);
214
```
215
216
#### High-Throughput Configuration
217
218
```java
219
// Configure for high throughput with KPL settings
220
props.setProperty("RecordMaxBufferedTime", "100"); // 100ms batching
221
props.setProperty("MaxConnections", "24"); // More connections
222
props.setProperty("RequestTimeout", "6000"); // 6 second timeout
223
props.setProperty("RecordTtl", "30000"); // 30 second TTL
224
225
// Configure aggregation
226
props.setProperty("AggregationEnabled", "true");
227
props.setProperty("AggregationMaxCount", "4294967295");
228
props.setProperty("AggregationMaxSize", "51200");
229
230
// Set higher queue limit for buffering
231
producer.setQueueLimit(10000);
232
233
// Configure error handling for high throughput
234
producer.setFailOnError(false); // Log errors but continue processing
235
```
236
237
#### Exactly-Once Guarantees with Checkpointing
238
239
```java
240
// Enable checkpointing for exactly-once guarantees
241
env.enableCheckpointing(60000); // Checkpoint every minute
242
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
243
244
// Configure checkpoint cleanup
245
env.getCheckpointConfig().enableExternalizedCheckpoints(
246
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
247
);
248
249
// The producer automatically participates in checkpointing
250
// Records are only considered committed after successful checkpoint
251
```
252
253
### Configuration Options
254
255
Key configuration properties for the FlinkKinesisProducer:
256
257
#### KPL Configuration
258
- `RecordMaxBufferedTime`: Maximum time to buffer records (default: 100ms)
259
- `RecordTtl`: Time-to-live for records in buffer (default: 30000ms)
260
- `RequestTimeout`: Timeout for HTTP requests (default: 6000ms)
261
- `MaxConnections`: Maximum concurrent connections (default: 24)
262
263
#### Aggregation Settings
264
- `AggregationEnabled`: Enable record aggregation (default: true)
265
- `AggregationMaxCount`: Maximum records per aggregate (default: 4294967295)
266
- `AggregationMaxSize`: Maximum aggregate size in bytes (default: 51200)
267
268
#### Retry Configuration
269
- `RetryDuration`: Maximum retry duration (default: 10000ms)
270
- `MetricsLevel`: CloudWatch metrics level (NONE, SUMMARY, DETAILED)
271
- `MetricsGranularity`: Metrics granularity (GLOBAL, STREAM, SHARD)
272
273
### Error Handling and Reliability
274
275
The producer provides several mechanisms for handling errors and ensuring reliability:
276
277
#### Automatic Retry
278
- Built-in exponential backoff for transient failures
279
- Configurable retry duration and maximum attempts
280
- Automatic handling of throttling and service limits
281
282
#### Error Handling Modes
283
- **Fail-on-Error**: Fail the entire job on any production error
284
- **Log-and-Continue**: Log errors but continue processing other records
285
- **Custom Handling**: Implement custom error handling in serialization schema
286
287
#### Metrics and Monitoring
288
- **Backpressure Cycles**: Number of times producer was backpressured
289
- **Outstanding Records**: Current number of unacknowledged records
290
- **KPL Metrics**: Detailed metrics from Kinesis Producer Library
291
- **CloudWatch Integration**: Automatic metric publishing to CloudWatch
292
293
#### Memory Management
294
- Configurable queue limits to prevent out-of-memory errors
295
- Automatic backpressure when queue limits are reached
296
- Resource cleanup on job cancellation or failure
297
298
### Performance Tuning
299
300
#### Throughput Optimization
301
- Increase `MaxConnections` for higher parallelism
302
- Reduce `RecordMaxBufferedTime` for lower latency
303
- Enable aggregation for better throughput
304
- Tune queue limits based on memory availability
305
306
#### Latency Optimization
307
- Reduce `RecordMaxBufferedTime` to minimize buffering delay
308
- Disable aggregation for lowest latency
309
- Use direct partition assignment instead of random partitioning
310
- Configure smaller batch sizes
311
312
#### Resource Management
313
- Monitor outstanding records count to prevent memory issues
314
- Use appropriate queue limits based on record size and memory
315
- Configure KPL thread pool sizes based on CPU cores
316
- Set reasonable timeouts to prevent resource leaks