or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-kinesis_2-11

Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kinesis_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kinesis_2-11@1.14.0

0

# Apache Flink Kinesis Connector

1

2

Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services. This connector enables exactly-once processing guarantees, automatic shard discovery, checkpointing, and supports both Kinesis Data Streams and DynamoDB Streams.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-kinesis_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven pom.xml:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>

14

<version>1.14.6</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

22

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

23

import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;

24

```

25

26

Configuration and serialization:

27

28

```java

29

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

30

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

31

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

32

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;

33

```

34

35

## Basic Usage

36

37

```java

38

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

39

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

40

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

41

import org.apache.flink.api.common.serialization.SimpleStringSchema;

42

import java.util.Properties;

43

44

// Create execution environment

45

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

46

47

// Configure AWS properties

48

Properties consumerProps = new Properties();

49

consumerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");

50

consumerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

51

consumerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

52

consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

53

54

// Create Kinesis consumer

55

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(

56

"my-stream",

57

new SimpleStringSchema(),

58

consumerProps

59

);

60

61

// Create data stream from Kinesis

62

DataStream<String> stream = env.addSource(consumer);

63

64

// Configure producer properties

65

Properties producerProps = new Properties();

66

producerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");

67

producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

68

producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

69

70

// Create Kinesis producer

71

FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(

72

new SimpleStringSchema(),

73

producerProps

74

);

75

producer.setDefaultStream("output-stream");

76

77

// Send data to Kinesis

78

stream.addSink(producer);

79

80

// Execute the job

81

env.execute("Kinesis Streaming Job");

82

```

83

84

## Architecture

85

86

The Flink Kinesis connector is built around several key components:

87

88

- **Consumer (FlinkKinesisConsumer)**: Reads from Kinesis streams with exactly-once semantics using Flink's checkpointing mechanism

89

- **Producer (FlinkKinesisProducer)**: Writes to Kinesis streams using the Kinesis Producer Library (KPL) for high throughput

90

- **Shard Management**: Automatic shard discovery and assignment across Flink parallelism

91

- **Watermark Support**: Event-time processing with configurable watermark strategies

92

- **AWS Integration**: Support for multiple AWS credential providers and regions

93

94

The connector supports both AWS SDK v1.x and v2.x, provides comprehensive metrics, and integrates with Flink's Table API for SQL-based stream processing.

95

96

## Capabilities

97

98

### Kinesis Consumer

99

100

Core consumer functionality for reading from Kinesis Data Streams with exactly-once processing guarantees, automatic shard discovery, and comprehensive configuration options.

101

102

```java { .api }

103

public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>

104

implements ResultTypeQueryable<T>, CheckpointedFunction {

105

106

public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);

107

public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);

108

public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);

109

110

public void setShardAssigner(KinesisShardAssigner shardAssigner);

111

public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);

112

public void setWatermarkTracker(WatermarkTracker watermarkTracker);

113

}

114

```

115

116

[Consumer](./consumer.md)

117

118

### Kinesis Producer

119

120

Producer functionality for writing data to Kinesis Data Streams with configurable partitioning, error handling, and backpressure management.

121

122

```java { .api }

123

public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>

124

implements CheckpointedFunction {

125

126

public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);

127

public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);

128

129

public void setFailOnError(boolean failOnError);

130

public void setQueueLimit(int queueLimit);

131

public void setDefaultStream(String defaultStream);

132

public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);

133

}

134

```

135

136

[Producer](./producer.md)

137

138

### DynamoDB Streams Integration

139

140

Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality.

141

142

```java { .api }

143

public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {

144

public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);

145

public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);

146

}

147

```

148

149

[DynamoDB Streams](./dynamodb-streams.md)

150

151

### Configuration Management

152

153

Comprehensive configuration constants and utilities for AWS credentials, regions, consumer behavior, and producer settings.

154

155

```java { .api }

156

public class AWSConfigConstants {

157

public static final String AWS_REGION = "aws.region";

158

public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";

159

public static final String AWS_ACCESS_KEY_ID;

160

public static final String AWS_SECRET_ACCESS_KEY;

161

162

public enum CredentialProvider { ENV_VAR, SYS_PROP, PROFILE, BASIC, ASSUME_ROLE, WEB_IDENTITY_TOKEN, AUTO }

163

}

164

165

public class ConsumerConfigConstants extends AWSConfigConstants {

166

public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

167

public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

168

public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";

169

170

public enum InitialPosition { TRIM_HORIZON, LATEST, AT_TIMESTAMP }

171

public enum RecordPublisherType { EFO, POLLING }

172

}

173

```

174

175

[Configuration](./configuration.md)

176

177

### Serialization and Deserialization

178

179

Kinesis-specific serialization interfaces that provide access to stream metadata and allow custom target stream specification.

180

181

```java { .api }

182

public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

183

default void open(DeserializationSchema.InitializationContext context) throws Exception;

184

T deserialize(byte[] recordValue, String partitionKey, String seqNum,

185

long approxArrivalTimestamp, String stream, String shardId) throws IOException;

186

}

187

188

public interface KinesisSerializationSchema<T> extends Serializable {

189

default void open(InitializationContext context) throws Exception;

190

ByteBuffer serialize(T element);

191

String getTargetStream(T element);

192

}

193

```

194

195

[Serialization](./serialization.md)

196

197

### Partitioning Strategies

198

199

Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks.

200

201

```java { .api }

202

public abstract class KinesisPartitioner<T> implements Serializable {

203

public abstract String getPartitionId(T element);

204

public String getExplicitHashKey(T element);

205

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);

206

}

207

208

public interface KinesisShardAssigner extends Serializable {

209

int assign(StreamShardHandle shard, int numParallelSubtasks);

210

}

211

```

212

213

[Partitioning](./partitioning.md)

214

215

### Table API Integration

216

217

SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks.

218

219

```java { .api }

220

public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

221

public static final String IDENTIFIER = "kinesis";

222

223

public DynamicTableSource createDynamicTableSource(Context context);

224

public DynamicTableSink createDynamicTableSink(Context context);

225

public Set<ConfigOption<?>> requiredOptions();

226

public Set<ConfigOption<?>> optionalOptions();

227

}

228

```

229

230

[Table API](./table-api.md)

231

232

## Types

233

234

### Core Model Classes

235

236

```java { .api }

237

public class SequenceNumber implements Comparable<SequenceNumber>, Serializable {

238

public String get();

239

public int compareTo(SequenceNumber other);

240

public boolean equals(Object obj);

241

public String toString();

242

}

243

244

public enum SentinelSequenceNumber {

245

SENTINEL_EARLIEST_SEQUENCE_NUM,

246

SENTINEL_LATEST_SEQUENCE_NUM,

247

SENTINEL_SHARD_ENDING_SEQUENCE_NUM

248

}

249

250

public class StartingPosition implements Serializable {

251

public static StartingPosition fromStart();

252

public static StartingPosition fromEnd();

253

public static StartingPosition fromTimestamp(Date timestamp);

254

public static StartingPosition continueFromSequenceNumber(SequenceNumber sequenceNumber);

255

}

256

257

public abstract class StreamShardHandle implements Serializable {

258

public abstract String getStreamName();

259

public abstract Shard getShard();

260

public abstract boolean equals(Object obj);

261

public abstract int hashCode();

262

}

263

264

public class KinesisStreamShardState implements Serializable {

265

public StreamShardMetadata getStreamShardMetadata();

266

public SequenceNumber getLastProcessedSequenceNum();

267

public boolean equals(Object obj);

268

public String toString();

269

}

270

```

271

272

### Exception Classes

273

274

```java { .api }

275

public abstract class FlinkKinesisException extends RuntimeException {

276

public FlinkKinesisException(String message);

277

public FlinkKinesisException(String message, Throwable cause);

278

279

public static class FlinkKinesisTimeoutException extends FlinkKinesisException {

280

// Semantic exception for timeout errors

281

}

282

}

283

```

284

285

### Watermark Management

286

287

```java { .api }

288

public abstract class WatermarkTracker implements Closeable, Serializable {

289

public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;

290

291

public abstract long getUpdateTimeoutCount();

292

public void setUpdateTimeoutMillis(long updateTimeoutMillis);

293

public abstract long updateWatermark(long localWatermark);

294

public void open(RuntimeContext context);

295

public void close();

296

}

297

```