or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

consumer.mddocs/

0

# Kinesis Consumer

1

2

The FlinkKinesisConsumer provides exactly-once streaming data ingestion from Amazon Kinesis Data Streams with automatic shard discovery, checkpointing, and comprehensive configuration options for high-throughput stream processing.

3

4

## Capabilities

5

6

### FlinkKinesisConsumer

7

8

Main consumer class for reading from one or more Kinesis streams with exactly-once processing guarantees through Flink's checkpointing mechanism.

9

10

```java { .api }

11

@PublicEvolving

12

public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>

13

implements ResultTypeQueryable<T>, CheckpointedFunction {

14

15

/**

16

* Create consumer for single stream with standard deserialization schema.

17

*

18

* @param stream Stream name to consume from

19

* @param deserializer Standard Flink deserialization schema

20

* @param configProps AWS and consumer configuration properties

21

*/

22

public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);

23

24

/**

25

* Create consumer for single stream with Kinesis-specific deserialization schema.

26

*

27

* @param stream Stream name to consume from

28

* @param deserializer Kinesis deserialization schema with metadata access

29

* @param configProps AWS and consumer configuration properties

30

*/

31

public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);

32

33

/**

34

* Create consumer for multiple streams with Kinesis-specific deserialization schema.

35

*

36

* @param streams List of stream names to consume from

37

* @param deserializer Kinesis deserialization schema with metadata access

38

* @param configProps AWS and consumer configuration properties

39

*/

40

public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);

41

42

/**

43

* Get current shard assigner for mapping shards to subtasks.

44

*

45

* @return Current shard assigner instance

46

*/

47

public KinesisShardAssigner getShardAssigner();

48

49

/**

50

* Set custom shard assigner for mapping shards to subtasks.

51

*

52

* @param shardAssigner Custom shard assigner implementation

53

*/

54

public void setShardAssigner(KinesisShardAssigner shardAssigner);

55

56

/**

57

* Get current periodic watermark assigner for event-time processing.

58

*

59

* @return Current watermark assigner instance

60

*/

61

public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner();

62

63

/**

64

* Set periodic watermark assigner for event-time processing.

65

*

66

* @param periodicWatermarkAssigner Watermark assigner implementation

67

*/

68

public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);

69

70

/**

71

* Get current watermark tracker for distributed watermark aggregation.

72

*

73

* @return Current watermark tracker instance

74

*/

75

public WatermarkTracker getWatermarkTracker();

76

77

/**

78

* Set watermark tracker for distributed watermark aggregation.

79

*

80

* @param watermarkTracker Watermark tracker implementation

81

*/

82

public void setWatermarkTracker(WatermarkTracker watermarkTracker);

83

84

/**

85

* Main source function execution method.

86

*

87

* @param sourceContext Flink source context for emitting records

88

* @throws Exception On processing errors

89

*/

90

public void run(SourceContext<T> sourceContext) throws Exception;

91

92

/**

93

* Cancel the consumer and stop reading from streams.

94

*/

95

public void cancel();

96

97

/**

98

* Close resources and cleanup.

99

*

100

* @throws Exception On cleanup errors

101

*/

102

public void close() throws Exception;

103

104

/**

105

* Get the type information for produced records.

106

*

107

* @return Type information for output type T

108

*/

109

public TypeInformation<T> getProducedType();

110

111

/**

112

* Initialize state for checkpointing.

113

*

114

* @param context Function initialization context

115

* @throws Exception On initialization errors

116

*/

117

public void initializeState(FunctionInitializationContext context) throws Exception;

118

119

/**

120

* Create snapshot of current state for checkpointing.

121

*

122

* @param context Function snapshot context

123

* @throws Exception On snapshot errors

124

*/

125

public void snapshotState(FunctionSnapshotContext context) throws Exception;

126

}

127

```

128

129

### Usage Examples

130

131

#### Basic Single Stream Consumer

132

133

```java

134

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

135

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

136

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

137

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

138

import org.apache.flink.api.common.serialization.SimpleStringSchema;

139

import java.util.Properties;

140

141

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

142

143

Properties props = new Properties();

144

props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");

145

props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

146

props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

147

props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

148

149

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(

150

"my-kinesis-stream",

151

new SimpleStringSchema(),

152

props

153

);

154

155

DataStream<String> stream = env.addSource(consumer);

156

```

157

158

#### Multi-Stream Consumer with Custom Deserialization

159

160

```java

161

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

162

import java.util.Arrays;

163

164

// Custom deserialization schema with access to Kinesis metadata

165

KinesisDeserializationSchema<MyEvent> deserializer = new KinesisDeserializationSchema<MyEvent>() {

166

@Override

167

public MyEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,

168

long approxArrivalTimestamp, String stream, String shardId) throws IOException {

169

// Parse JSON with metadata

170

MyEvent event = parseJson(recordValue);

171

event.setMetadata(stream, shardId, seqNum, approxArrivalTimestamp);

172

return event;

173

}

174

175

@Override

176

public TypeInformation<MyEvent> getProducedType() {

177

return TypeInformation.of(MyEvent.class);

178

}

179

};

180

181

FlinkKinesisConsumer<MyEvent> consumer = new FlinkKinesisConsumer<>(

182

Arrays.asList("stream-1", "stream-2", "stream-3"),

183

deserializer,

184

props

185

);

186

```

187

188

#### Consumer with Event-Time Processing and Watermarks

189

190

```java

191

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;

192

import org.apache.flink.streaming.api.watermark.Watermark;

193

194

// Custom watermark assigner for event-time processing

195

AssignerWithPeriodicWatermarks<MyEvent> watermarkAssigner = new AssignerWithPeriodicWatermarks<MyEvent>() {

196

private long maxTimestamp = Long.MIN_VALUE;

197

198

@Override

199

public long extractTimestamp(MyEvent element, long previousElementTimestamp) {

200

long timestamp = element.getEventTime();

201

maxTimestamp = Math.max(timestamp, maxTimestamp);

202

return timestamp;

203

}

204

205

@Override

206

public Watermark getCurrentWatermark() {

207

// Allow 10 seconds of lateness

208

return new Watermark(maxTimestamp - 10000);

209

}

210

};

211

212

consumer.setPeriodicWatermarkAssigner(watermarkAssigner);

213

214

// Configure watermark emission interval

215

env.getConfig().setAutoWatermarkInterval(5000);

216

217

// Configure shard idle timeout to prevent watermark stalling

218

props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");

219

```

220

221

#### Consumer with Custom Shard Assignment

222

223

```java

224

import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;

225

import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;

226

227

// Custom shard assigner to control load balancing

228

KinesisShardAssigner customAssigner = new KinesisShardAssigner() {

229

@Override

230

public int assign(StreamShardHandle shard, int numParallelSubtasks) {

231

// Custom logic for shard assignment

232

String shardId = shard.getShard().getShardId();

233

// Use consistent hashing or custom logic

234

return Math.abs(shardId.hashCode()) % numParallelSubtasks;

235

}

236

};

237

238

consumer.setShardAssigner(customAssigner);

239

```

240

241

#### Enhanced Fan-Out (EFO) Consumer Configuration

242

243

```java

244

// Configure Enhanced Fan-Out for dedicated throughput

245

props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");

246

props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-app");

247

248

// EFO registration strategy (LAZY, EAGER, NONE)

249

props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY");

250

251

// Consumer ARN for existing EFO consumer

252

String consumerArn = ConsumerConfigConstants.efoConsumerArn("my-kinesis-stream");

253

```

254

255

### Configuration Options

256

257

Key configuration properties for the FlinkKinesisConsumer:

258

259

#### Stream Configuration

260

- `STREAM_INITIAL_POSITION`: Starting position (TRIM_HORIZON, LATEST, AT_TIMESTAMP)

261

- `STREAM_INITIAL_TIMESTAMP`: Timestamp for AT_TIMESTAMP positioning

262

- `RECORD_PUBLISHER_TYPE`: EFO or POLLING record publisher

263

- `EFO_CONSUMER_NAME`: Name for Enhanced Fan-Out consumer

264

265

#### Shard Configuration

266

- `SHARD_GETRECORDS_MAX`: Maximum records per GetRecords call (default: 10000)

267

- `SHARD_GETRECORDS_INTERVAL_MILLIS`: Interval between GetRecords calls (default: 200ms)

268

- `SHARD_IDLE_INTERVAL_MILLIS`: Timeout for idle shard detection

269

- `SHARD_USE_ADAPTIVE_READS`: Enable adaptive read intervals

270

271

#### Watermark Configuration

272

- `WATERMARK_SYNC_MILLIS`: Interval for watermark synchronization (default: 30000)

273

- `WATERMARK_LOOKAHEAD_MILLIS`: Lookahead time for watermark calculation (default: 180000)

274

275

### Error Handling

276

277

The consumer provides comprehensive error handling and recovery mechanisms:

278

279

- **Checkpointing**: Automatic recovery from exactly the last checkpointed position

280

- **Shard Resharding**: Automatic detection and handling of shard splits and merges

281

- **Network Failures**: Automatic retry with exponential backoff

282

- **Throttling**: Built-in handling of Kinesis throttling with adaptive backoff

283

- **Idle Shards**: Configurable timeout to prevent stalled watermarks from closed shards