or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

index.mddocs/

0

# Apache Flink Kinesis Connector

1

2

Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services. This connector enables exactly-once processing guarantees, automatic shard discovery, checkpointing, and supports both Kinesis Data Streams and DynamoDB Streams.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-kinesis_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven pom.xml:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>

14

<version>1.14.6</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

22

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

23

import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;

24

```

25

26

Configuration and serialization:

27

28

```java

29

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

30

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

31

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

32

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;

33

```

34

35

## Basic Usage

36

37

```java

38

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

39

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

40

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

41

import org.apache.flink.api.common.serialization.SimpleStringSchema;

42

import java.util.Properties;

43

44

// Create execution environment

45

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

46

47

// Configure AWS properties

48

Properties consumerProps = new Properties();

49

consumerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");

50

consumerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

51

consumerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

52

consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

53

54

// Create Kinesis consumer

55

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(

56

"my-stream",

57

new SimpleStringSchema(),

58

consumerProps

59

);

60

61

// Create data stream from Kinesis

62

DataStream<String> stream = env.addSource(consumer);

63

64

// Configure producer properties

65

Properties producerProps = new Properties();

66

producerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");

67

producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

68

producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

69

70

// Create Kinesis producer

71

FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(

72

new SimpleStringSchema(),

73

producerProps

74

);

75

producer.setDefaultStream("output-stream");

76

77

// Send data to Kinesis

78

stream.addSink(producer);

79

80

// Execute the job

81

env.execute("Kinesis Streaming Job");

82

```

83

84

## Architecture

85

86

The Flink Kinesis connector is built around several key components:

87

88

- **Consumer (FlinkKinesisConsumer)**: Reads from Kinesis streams with exactly-once semantics using Flink's checkpointing mechanism

89

- **Producer (FlinkKinesisProducer)**: Writes to Kinesis streams using the Kinesis Producer Library (KPL) for high throughput

90

- **Shard Management**: Automatic shard discovery and assignment across Flink parallelism

91

- **Watermark Support**: Event-time processing with configurable watermark strategies

92

- **AWS Integration**: Support for multiple AWS credential providers and regions

93

94

The connector supports both AWS SDK v1.x and v2.x, provides comprehensive metrics, and integrates with Flink's Table API for SQL-based stream processing.

95

96

## Capabilities

97

98

### Kinesis Consumer

99

100

Core consumer functionality for reading from Kinesis Data Streams with exactly-once processing guarantees, automatic shard discovery, and comprehensive configuration options.

101

102

```java { .api }

103

public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>

104

implements ResultTypeQueryable<T>, CheckpointedFunction {

105

106

public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);

107

public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);

108

public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);

109

110

public void setShardAssigner(KinesisShardAssigner shardAssigner);

111

public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);

112

public void setWatermarkTracker(WatermarkTracker watermarkTracker);

113

}

114

```

115

116

[Consumer](./consumer.md)

117

118

### Kinesis Producer

119

120

Producer functionality for writing data to Kinesis Data Streams with configurable partitioning, error handling, and backpressure management.

121

122

```java { .api }

123

public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>

124

implements CheckpointedFunction {

125

126

public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);

127

public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);

128

129

public void setFailOnError(boolean failOnError);

130

public void setQueueLimit(int queueLimit);

131

public void setDefaultStream(String defaultStream);

132

public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);

133

}

134

```

135

136

[Producer](./producer.md)

137

138

### DynamoDB Streams Integration

139

140

Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality.

141

142

```java { .api }

143

public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {

144

public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);

145

public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);

146

}

147

```

148

149

[DynamoDB Streams](./dynamodb-streams.md)

150

151

### Configuration Management

152

153

Comprehensive configuration constants and utilities for AWS credentials, regions, consumer behavior, and producer settings.

154

155

```java { .api }

156

public class AWSConfigConstants {

157

public static final String AWS_REGION = "aws.region";

158

public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";

159

public static final String AWS_ACCESS_KEY_ID;

160

public static final String AWS_SECRET_ACCESS_KEY;

161

162

public enum CredentialProvider { ENV_VAR, SYS_PROP, PROFILE, BASIC, ASSUME_ROLE, WEB_IDENTITY_TOKEN, AUTO }

163

}

164

165

public class ConsumerConfigConstants extends AWSConfigConstants {

166

public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

167

public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

168

public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";

169

170

public enum InitialPosition { TRIM_HORIZON, LATEST, AT_TIMESTAMP }

171

public enum RecordPublisherType { EFO, POLLING }

172

}

173

```

174

175

[Configuration](./configuration.md)

176

177

### Serialization and Deserialization

178

179

Kinesis-specific serialization interfaces that provide access to stream metadata and allow custom target stream specification.

180

181

```java { .api }

182

public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

183

default void open(DeserializationSchema.InitializationContext context) throws Exception;

184

T deserialize(byte[] recordValue, String partitionKey, String seqNum,

185

long approxArrivalTimestamp, String stream, String shardId) throws IOException;

186

}

187

188

public interface KinesisSerializationSchema<T> extends Serializable {

189

default void open(InitializationContext context) throws Exception;

190

ByteBuffer serialize(T element);

191

String getTargetStream(T element);

192

}

193

```

194

195

[Serialization](./serialization.md)

196

197

### Partitioning Strategies

198

199

Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks.

200

201

```java { .api }

202

public abstract class KinesisPartitioner<T> implements Serializable {

203

public abstract String getPartitionId(T element);

204

public String getExplicitHashKey(T element);

205

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);

206

}

207

208

public interface KinesisShardAssigner extends Serializable {

209

int assign(StreamShardHandle shard, int numParallelSubtasks);

210

}

211

```

212

213

[Partitioning](./partitioning.md)

214

215

### Table API Integration

216

217

SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks.

218

219

```java { .api }

220

public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

221

public static final String IDENTIFIER = "kinesis";

222

223

public DynamicTableSource createDynamicTableSource(Context context);

224

public DynamicTableSink createDynamicTableSink(Context context);

225

public Set<ConfigOption<?>> requiredOptions();

226

public Set<ConfigOption<?>> optionalOptions();

227

}

228

```

229

230

[Table API](./table-api.md)

231

232

## Types

233

234

### Core Model Classes

235

236

```java { .api }

237

public class SequenceNumber implements Comparable<SequenceNumber>, Serializable {

238

public String get();

239

public int compareTo(SequenceNumber other);

240

public boolean equals(Object obj);

241

public String toString();

242

}

243

244

public enum SentinelSequenceNumber {

245

SENTINEL_EARLIEST_SEQUENCE_NUM,

246

SENTINEL_LATEST_SEQUENCE_NUM,

247

SENTINEL_SHARD_ENDING_SEQUENCE_NUM

248

}

249

250

public class StartingPosition implements Serializable {

251

public static StartingPosition fromStart();

252

public static StartingPosition fromEnd();

253

public static StartingPosition fromTimestamp(Date timestamp);

254

public static StartingPosition continueFromSequenceNumber(SequenceNumber sequenceNumber);

255

}

256

257

public abstract class StreamShardHandle implements Serializable {

258

public abstract String getStreamName();

259

public abstract Shard getShard();

260

public abstract boolean equals(Object obj);

261

public abstract int hashCode();

262

}

263

264

public class KinesisStreamShardState implements Serializable {

265

public StreamShardMetadata getStreamShardMetadata();

266

public SequenceNumber getLastProcessedSequenceNum();

267

public boolean equals(Object obj);

268

public String toString();

269

}

270

```

271

272

### Exception Classes

273

274

```java { .api }

275

public abstract class FlinkKinesisException extends RuntimeException {

276

public FlinkKinesisException(String message);

277

public FlinkKinesisException(String message, Throwable cause);

278

279

public static class FlinkKinesisTimeoutException extends FlinkKinesisException {

280

// Semantic exception for timeout errors

281

}

282

}

283

```

284

285

### Watermark Management

286

287

```java { .api }

288

public abstract class WatermarkTracker implements Closeable, Serializable {

289

public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;

290

291

public abstract long getUpdateTimeoutCount();

292

public void setUpdateTimeoutMillis(long updateTimeoutMillis);

293

public abstract long updateWatermark(long localWatermark);

294

public void open(RuntimeContext context);

295

public void close();

296

}

297

```