0
# Kinesis Consumer
1
2
The FlinkKinesisConsumer provides exactly-once streaming data ingestion from Amazon Kinesis Data Streams with automatic shard discovery, checkpointing, and comprehensive configuration options for high-throughput stream processing.
3
4
## Capabilities
5
6
### FlinkKinesisConsumer
7
8
Main consumer class for reading from one or more Kinesis streams with exactly-once processing guarantees through Flink's checkpointing mechanism.
9
10
```java { .api }
11
@PublicEvolving
12
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
13
implements ResultTypeQueryable<T>, CheckpointedFunction {
14
15
/**
16
* Create consumer for single stream with standard deserialization schema.
17
*
18
* @param stream Stream name to consume from
19
* @param deserializer Standard Flink deserialization schema
20
* @param configProps AWS and consumer configuration properties
21
*/
22
public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
23
24
/**
25
* Create consumer for single stream with Kinesis-specific deserialization schema.
26
*
27
* @param stream Stream name to consume from
28
* @param deserializer Kinesis deserialization schema with metadata access
29
* @param configProps AWS and consumer configuration properties
30
*/
31
public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
32
33
/**
34
* Create consumer for multiple streams with Kinesis-specific deserialization schema.
35
*
36
* @param streams List of stream names to consume from
37
* @param deserializer Kinesis deserialization schema with metadata access
38
* @param configProps AWS and consumer configuration properties
39
*/
40
public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
41
42
/**
43
* Get current shard assigner for mapping shards to subtasks.
44
*
45
* @return Current shard assigner instance
46
*/
47
public KinesisShardAssigner getShardAssigner();
48
49
/**
50
* Set custom shard assigner for mapping shards to subtasks.
51
*
52
* @param shardAssigner Custom shard assigner implementation
53
*/
54
public void setShardAssigner(KinesisShardAssigner shardAssigner);
55
56
/**
57
* Get current periodic watermark assigner for event-time processing.
58
*
59
* @return Current watermark assigner instance
60
*/
61
public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner();
62
63
/**
64
* Set periodic watermark assigner for event-time processing.
65
*
66
* @param periodicWatermarkAssigner Watermark assigner implementation
67
*/
68
public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
69
70
/**
71
* Get current watermark tracker for distributed watermark aggregation.
72
*
73
* @return Current watermark tracker instance
74
*/
75
public WatermarkTracker getWatermarkTracker();
76
77
/**
78
* Set watermark tracker for distributed watermark aggregation.
79
*
80
* @param watermarkTracker Watermark tracker implementation
81
*/
82
public void setWatermarkTracker(WatermarkTracker watermarkTracker);
83
84
/**
85
* Main source function execution method.
86
*
87
* @param sourceContext Flink source context for emitting records
88
* @throws Exception On processing errors
89
*/
90
public void run(SourceContext<T> sourceContext) throws Exception;
91
92
/**
93
* Cancel the consumer and stop reading from streams.
94
*/
95
public void cancel();
96
97
/**
98
* Close resources and cleanup.
99
*
100
* @throws Exception On cleanup errors
101
*/
102
public void close() throws Exception;
103
104
/**
105
* Get the type information for produced records.
106
*
107
* @return Type information for output type T
108
*/
109
public TypeInformation<T> getProducedType();
110
111
/**
112
* Initialize state for checkpointing.
113
*
114
* @param context Function initialization context
115
* @throws Exception On initialization errors
116
*/
117
public void initializeState(FunctionInitializationContext context) throws Exception;
118
119
/**
120
* Create snapshot of current state for checkpointing.
121
*
122
* @param context Function snapshot context
123
* @throws Exception On snapshot errors
124
*/
125
public void snapshotState(FunctionSnapshotContext context) throws Exception;
126
}
127
```
128
129
### Usage Examples
130
131
#### Basic Single Stream Consumer
132
133
```java
134
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
135
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
136
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
137
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
138
import org.apache.flink.api.common.serialization.SimpleStringSchema;
139
import java.util.Properties;
140
141
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
142
143
Properties props = new Properties();
144
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
145
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
146
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
147
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
148
149
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
150
"my-kinesis-stream",
151
new SimpleStringSchema(),
152
props
153
);
154
155
DataStream<String> stream = env.addSource(consumer);
156
```
157
158
#### Multi-Stream Consumer with Custom Deserialization
159
160
```java
161
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
162
import java.util.Arrays;
163
164
// Custom deserialization schema with access to Kinesis metadata
165
KinesisDeserializationSchema<MyEvent> deserializer = new KinesisDeserializationSchema<MyEvent>() {
166
@Override
167
public MyEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,
168
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
169
// Parse JSON with metadata
170
MyEvent event = parseJson(recordValue);
171
event.setMetadata(stream, shardId, seqNum, approxArrivalTimestamp);
172
return event;
173
}
174
175
@Override
176
public TypeInformation<MyEvent> getProducedType() {
177
return TypeInformation.of(MyEvent.class);
178
}
179
};
180
181
FlinkKinesisConsumer<MyEvent> consumer = new FlinkKinesisConsumer<>(
182
Arrays.asList("stream-1", "stream-2", "stream-3"),
183
deserializer,
184
props
185
);
186
```
187
188
#### Consumer with Event-Time Processing and Watermarks
189
190
```java
191
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
192
import org.apache.flink.streaming.api.watermark.Watermark;
193
194
// Custom watermark assigner for event-time processing
195
AssignerWithPeriodicWatermarks<MyEvent> watermarkAssigner = new AssignerWithPeriodicWatermarks<MyEvent>() {
196
private long maxTimestamp = Long.MIN_VALUE;
197
198
@Override
199
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
200
long timestamp = element.getEventTime();
201
maxTimestamp = Math.max(timestamp, maxTimestamp);
202
return timestamp;
203
}
204
205
@Override
206
public Watermark getCurrentWatermark() {
207
// Allow 10 seconds of lateness
208
return new Watermark(maxTimestamp - 10000);
209
}
210
};
211
212
consumer.setPeriodicWatermarkAssigner(watermarkAssigner);
213
214
// Configure watermark emission interval
215
env.getConfig().setAutoWatermarkInterval(5000);
216
217
// Configure shard idle timeout to prevent watermark stalling
218
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
219
```
220
221
#### Consumer with Custom Shard Assignment
222
223
```java
224
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
225
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
226
227
// Custom shard assigner to control load balancing
228
KinesisShardAssigner customAssigner = new KinesisShardAssigner() {
229
@Override
230
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
231
// Custom logic for shard assignment
232
String shardId = shard.getShard().getShardId();
233
// Use consistent hashing or custom logic
234
return Math.abs(shardId.hashCode()) % numParallelSubtasks;
235
}
236
};
237
238
consumer.setShardAssigner(customAssigner);
239
```
240
241
#### Enhanced Fan-Out (EFO) Consumer Configuration
242
243
```java
244
// Configure Enhanced Fan-Out for dedicated throughput
245
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
246
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-app");
247
248
// EFO registration strategy (LAZY, EAGER, NONE)
249
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY");
250
251
// Consumer ARN for existing EFO consumer
252
String consumerArn = ConsumerConfigConstants.efoConsumerArn("my-kinesis-stream");
253
```
254
255
### Configuration Options
256
257
Key configuration properties for the FlinkKinesisConsumer:
258
259
#### Stream Configuration
260
- `STREAM_INITIAL_POSITION`: Starting position (TRIM_HORIZON, LATEST, AT_TIMESTAMP)
261
- `STREAM_INITIAL_TIMESTAMP`: Timestamp for AT_TIMESTAMP positioning
262
- `RECORD_PUBLISHER_TYPE`: EFO or POLLING record publisher
263
- `EFO_CONSUMER_NAME`: Name for Enhanced Fan-Out consumer
264
265
#### Shard Configuration
266
- `SHARD_GETRECORDS_MAX`: Maximum records per GetRecords call (default: 10000)
267
- `SHARD_GETRECORDS_INTERVAL_MILLIS`: Interval between GetRecords calls (default: 200ms)
268
- `SHARD_IDLE_INTERVAL_MILLIS`: Timeout for idle shard detection
269
- `SHARD_USE_ADAPTIVE_READS`: Enable adaptive read intervals
270
271
#### Watermark Configuration
272
- `WATERMARK_SYNC_MILLIS`: Interval for watermark synchronization (default: 30000)
273
- `WATERMARK_LOOKAHEAD_MILLIS`: Lookahead time for watermark calculation (default: 180000)
274
275
### Error Handling
276
277
The consumer provides comprehensive error handling and recovery mechanisms:
278
279
- **Checkpointing**: Automatic recovery from exactly the last checkpointed position
280
- **Shard Resharding**: Automatic detection and handling of shard splits and merges
281
- **Network Failures**: Automatic retry with exponential backoff
282
- **Throttling**: Built-in handling of Kinesis throttling with adaptive backoff
283
- **Idle Shards**: Configurable timeout to prevent stalled watermarks from closed shards