Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kinesis_2-11@1.14.00
# Apache Flink Kinesis Connector
1
2
Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services. This connector enables exactly-once processing guarantees, automatic shard discovery, checkpointing, and supports both Kinesis Data Streams and DynamoDB Streams.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-kinesis_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven pom.xml:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
22
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
23
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;
24
```
25
26
Configuration and serialization:
27
28
```java
29
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
30
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
31
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
32
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
33
```
34
35
## Basic Usage
36
37
```java
38
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
39
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
40
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
41
import org.apache.flink.api.common.serialization.SimpleStringSchema;
42
import java.util.Properties;
43
44
// Create execution environment
45
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
46
47
// Configure AWS properties
48
Properties consumerProps = new Properties();
49
consumerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
50
consumerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
51
consumerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
52
consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
53
54
// Create Kinesis consumer
55
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
56
"my-stream",
57
new SimpleStringSchema(),
58
consumerProps
59
);
60
61
// Create data stream from Kinesis
62
DataStream<String> stream = env.addSource(consumer);
63
64
// Configure producer properties
65
Properties producerProps = new Properties();
66
producerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
67
producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
68
producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
69
70
// Create Kinesis producer
71
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
72
new SimpleStringSchema(),
73
producerProps
74
);
75
producer.setDefaultStream("output-stream");
76
77
// Send data to Kinesis
78
stream.addSink(producer);
79
80
// Execute the job
81
env.execute("Kinesis Streaming Job");
82
```
83
84
## Architecture
85
86
The Flink Kinesis connector is built around several key components:
87
88
- **Consumer (FlinkKinesisConsumer)**: Reads from Kinesis streams with exactly-once semantics using Flink's checkpointing mechanism
89
- **Producer (FlinkKinesisProducer)**: Writes to Kinesis streams using the Kinesis Producer Library (KPL) for high throughput
90
- **Shard Management**: Automatic shard discovery and assignment across Flink parallelism
91
- **Watermark Support**: Event-time processing with configurable watermark strategies
92
- **AWS Integration**: Support for multiple AWS credential providers and regions
93
94
The connector supports both AWS SDK v1.x and v2.x, provides comprehensive metrics, and integrates with Flink's Table API for SQL-based stream processing.
95
96
## Capabilities
97
98
### Kinesis Consumer
99
100
Core consumer functionality for reading from Kinesis Data Streams with exactly-once processing guarantees, automatic shard discovery, and comprehensive configuration options.
101
102
```java { .api }
103
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
104
implements ResultTypeQueryable<T>, CheckpointedFunction {
105
106
public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
107
public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
108
public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
109
110
public void setShardAssigner(KinesisShardAssigner shardAssigner);
111
public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
112
public void setWatermarkTracker(WatermarkTracker watermarkTracker);
113
}
114
```
115
116
[Consumer](./consumer.md)
117
118
### Kinesis Producer
119
120
Producer functionality for writing data to Kinesis Data Streams with configurable partitioning, error handling, and backpressure management.
121
122
```java { .api }
123
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>
124
implements CheckpointedFunction {
125
126
public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
127
public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
128
129
public void setFailOnError(boolean failOnError);
130
public void setQueueLimit(int queueLimit);
131
public void setDefaultStream(String defaultStream);
132
public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
133
}
134
```
135
136
[Producer](./producer.md)
137
138
### DynamoDB Streams Integration
139
140
Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality.
141
142
```java { .api }
143
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
144
public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
145
public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
146
}
147
```
148
149
[DynamoDB Streams](./dynamodb-streams.md)
150
151
### Configuration Management
152
153
Comprehensive configuration constants and utilities for AWS credentials, regions, consumer behavior, and producer settings.
154
155
```java { .api }
156
public class AWSConfigConstants {
157
public static final String AWS_REGION = "aws.region";
158
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
159
public static final String AWS_ACCESS_KEY_ID;
160
public static final String AWS_SECRET_ACCESS_KEY;
161
162
public enum CredentialProvider { ENV_VAR, SYS_PROP, PROFILE, BASIC, ASSUME_ROLE, WEB_IDENTITY_TOKEN, AUTO }
163
}
164
165
public class ConsumerConfigConstants extends AWSConfigConstants {
166
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
167
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
168
public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
169
170
public enum InitialPosition { TRIM_HORIZON, LATEST, AT_TIMESTAMP }
171
public enum RecordPublisherType { EFO, POLLING }
172
}
173
```
174
175
[Configuration](./configuration.md)
176
177
### Serialization and Deserialization
178
179
Kinesis-specific serialization interfaces that provide access to stream metadata and allow custom target stream specification.
180
181
```java { .api }
182
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
183
default void open(DeserializationSchema.InitializationContext context) throws Exception;
184
T deserialize(byte[] recordValue, String partitionKey, String seqNum,
185
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
186
}
187
188
public interface KinesisSerializationSchema<T> extends Serializable {
189
default void open(InitializationContext context) throws Exception;
190
ByteBuffer serialize(T element);
191
String getTargetStream(T element);
192
}
193
```
194
195
[Serialization](./serialization.md)
196
197
### Partitioning Strategies
198
199
Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks.
200
201
```java { .api }
202
public abstract class KinesisPartitioner<T> implements Serializable {
203
public abstract String getPartitionId(T element);
204
public String getExplicitHashKey(T element);
205
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);
206
}
207
208
public interface KinesisShardAssigner extends Serializable {
209
int assign(StreamShardHandle shard, int numParallelSubtasks);
210
}
211
```
212
213
[Partitioning](./partitioning.md)
214
215
### Table API Integration
216
217
SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks.
218
219
```java { .api }
220
public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
221
public static final String IDENTIFIER = "kinesis";
222
223
public DynamicTableSource createDynamicTableSource(Context context);
224
public DynamicTableSink createDynamicTableSink(Context context);
225
public Set<ConfigOption<?>> requiredOptions();
226
public Set<ConfigOption<?>> optionalOptions();
227
}
228
```
229
230
[Table API](./table-api.md)
231
232
## Types
233
234
### Core Model Classes
235
236
```java { .api }
237
public class SequenceNumber implements Comparable<SequenceNumber>, Serializable {
238
public String get();
239
public int compareTo(SequenceNumber other);
240
public boolean equals(Object obj);
241
public String toString();
242
}
243
244
public enum SentinelSequenceNumber {
245
SENTINEL_EARLIEST_SEQUENCE_NUM,
246
SENTINEL_LATEST_SEQUENCE_NUM,
247
SENTINEL_SHARD_ENDING_SEQUENCE_NUM
248
}
249
250
public class StartingPosition implements Serializable {
251
public static StartingPosition fromStart();
252
public static StartingPosition fromEnd();
253
public static StartingPosition fromTimestamp(Date timestamp);
254
public static StartingPosition continueFromSequenceNumber(SequenceNumber sequenceNumber);
255
}
256
257
public abstract class StreamShardHandle implements Serializable {
258
public abstract String getStreamName();
259
public abstract Shard getShard();
260
public abstract boolean equals(Object obj);
261
public abstract int hashCode();
262
}
263
264
public class KinesisStreamShardState implements Serializable {
265
public StreamShardMetadata getStreamShardMetadata();
266
public SequenceNumber getLastProcessedSequenceNum();
267
public boolean equals(Object obj);
268
public String toString();
269
}
270
```
271
272
### Exception Classes
273
274
```java { .api }
275
public abstract class FlinkKinesisException extends RuntimeException {
276
public FlinkKinesisException(String message);
277
public FlinkKinesisException(String message, Throwable cause);
278
279
public static class FlinkKinesisTimeoutException extends FlinkKinesisException {
280
// Semantic exception for timeout errors
281
}
282
}
283
```
284
285
### Watermark Management
286
287
```java { .api }
288
public abstract class WatermarkTracker implements Closeable, Serializable {
289
public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
290
291
public abstract long getUpdateTimeoutCount();
292
public void setUpdateTimeoutMillis(long updateTimeoutMillis);
293
public abstract long updateWatermark(long localWatermark);
294
public void open(RuntimeContext context);
295
public void close();
296
}
297
```