or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

dynamodb-streams.mddocs/

0

# DynamoDB Streams Integration

1

2

Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality for change data capture.

3

4

## Capabilities

5

6

### FlinkDynamoDBStreamsConsumer

7

8

Specialized consumer that extends FlinkKinesisConsumer to work with DynamoDB Streams, providing the same exactly-once guarantees and configuration options while handling DynamoDB-specific stream characteristics.

9

10

```java { .api }

11

public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {

12

13

/**

14

* Create consumer for single DynamoDB stream with standard deserialization schema.

15

*

16

* @param stream DynamoDB stream ARN or name to consume from

17

* @param deserializer Standard Flink deserialization schema

18

* @param config AWS and consumer configuration properties

19

*/

20

public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);

21

22

/**

23

* Create consumer for multiple DynamoDB streams with Kinesis-specific deserialization schema.

24

*

25

* @param streams List of DynamoDB stream ARNs or names to consume from

26

* @param deserializer Kinesis deserialization schema with metadata access

27

* @param config AWS and consumer configuration properties

28

*/

29

public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);

30

}

31

```

32

33

## Usage Examples

34

35

### Basic DynamoDB Streams Consumer

36

37

```java

38

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

39

import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;

40

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

41

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

42

import org.apache.flink.api.common.serialization.SimpleStringSchema;

43

import java.util.Properties;

44

45

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

46

47

Properties props = new Properties();

48

props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");

49

props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

50

props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

51

props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

52

53

// DynamoDB Stream ARN

54

String dynamoStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/stream/2023-01-01T00:00:00.000";

55

56

FlinkDynamoDBStreamsConsumer<String> consumer = new FlinkDynamoDBStreamsConsumer<>(

57

dynamoStreamArn,

58

new SimpleStringSchema(),

59

props

60

);

61

62

DataStream<String> stream = env.addSource(consumer);

63

```

64

65

### DynamoDB Change Data Capture

66

67

```java

68

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

69

import com.fasterxml.jackson.databind.JsonNode;

70

import com.fasterxml.jackson.databind.ObjectMapper;

71

72

public class DynamoDBChangeRecord {

73

private String eventName; // INSERT, MODIFY, REMOVE

74

private String tableName;

75

private JsonNode dynamodb; // DynamoDB record data

76

private long approximateCreationDateTime;

77

78

// getters and setters...

79

}

80

81

public class DynamoDBStreamDeserializer implements KinesisDeserializationSchema<DynamoDBChangeRecord> {

82

private transient ObjectMapper objectMapper;

83

84

@Override

85

public void open(DeserializationSchema.InitializationContext context) throws Exception {

86

objectMapper = new ObjectMapper();

87

}

88

89

@Override

90

public DynamoDBChangeRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,

91

long approxArrivalTimestamp, String stream, String shardId)

92

throws IOException {

93

String json = new String(recordValue, StandardCharsets.UTF_8);

94

JsonNode root = objectMapper.readTree(json);

95

96

DynamoDBChangeRecord changeRecord = new DynamoDBChangeRecord();

97

changeRecord.setEventName(root.get("eventName").asText());

98

changeRecord.setTableName(extractTableName(stream));

99

changeRecord.setDynamodb(root.get("dynamodb"));

100

changeRecord.setApproximateCreationDateTime(

101

root.get("dynamodb").get("ApproximateCreationDateTime").asLong()

102

);

103

104

return changeRecord;

105

}

106

107

private String extractTableName(String streamArn) {

108

// Extract table name from DynamoDB stream ARN

109

// arn:aws:dynamodb:region:account:table/TableName/stream/timestamp

110

String[] parts = streamArn.split("/");

111

return parts.length >= 2 ? parts[1] : "unknown";

112

}

113

114

@Override

115

public TypeInformation<DynamoDBChangeRecord> getProducedType() {

116

return TypeInformation.of(DynamoDBChangeRecord.class);

117

}

118

}

119

120

// Use the deserializer

121

FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(

122

dynamoStreamArn,

123

new DynamoDBStreamDeserializer(),

124

props

125

);

126

```

127

128

### Multi-Table Change Streaming

129

130

```java

131

import java.util.Arrays;

132

import java.util.List;

133

134

// Monitor multiple DynamoDB tables

135

List<String> tableStreams = Arrays.asList(

136

"arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000",

137

"arn:aws:dynamodb:us-west-2:123456789012:table/Orders/stream/2023-01-01T00:00:00.000",

138

"arn:aws:dynamodb:us-west-2:123456789012:table/Products/stream/2023-01-01T00:00:00.000"

139

);

140

141

FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(

142

tableStreams,

143

new DynamoDBStreamDeserializer(),

144

props

145

);

146

147

DataStream<DynamoDBChangeRecord> changes = env.addSource(consumer);

148

149

// Process changes by table

150

changes

151

.keyBy(record -> record.getTableName())

152

.process(new TableSpecificChangeProcessor());

153

```

154

155

### Real-Time Analytics Pipeline

156

157

```java

158

public class DynamoDBAnalyticsRecord {

159

private String tableName;

160

private String eventType;

161

private long timestamp;

162

private Map<String, Object> oldImage;

163

private Map<String, Object> newImage;

164

private String partitionKey;

165

private String sortKey;

166

167

// getters and setters...

168

}

169

170

public class AnalyticsDeserializer implements KinesisDeserializationSchema<DynamoDBAnalyticsRecord> {

171

private transient ObjectMapper objectMapper;

172

173

@Override

174

public void open(DeserializationSchema.InitializationContext context) throws Exception {

175

objectMapper = new ObjectMapper();

176

}

177

178

@Override

179

public DynamoDBAnalyticsRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,

180

long approxArrivalTimestamp, String stream, String shardId)

181

throws IOException {

182

String json = new String(recordValue, StandardCharsets.UTF_8);

183

JsonNode root = objectMapper.readTree(json);

184

JsonNode dynamodb = root.get("dynamodb");

185

186

DynamoDBAnalyticsRecord record = new DynamoDBAnalyticsRecord();

187

record.setTableName(extractTableName(stream));

188

record.setEventType(root.get("eventName").asText());

189

record.setTimestamp(dynamodb.get("ApproximateCreationDateTime").asLong());

190

191

// Extract partition key and sort key

192

JsonNode keys = dynamodb.get("Keys");

193

if (keys != null) {

194

record.setPartitionKey(extractAttributeValue(keys.get("pk")));

195

if (keys.has("sk")) {

196

record.setSortKey(extractAttributeValue(keys.get("sk")));

197

}

198

}

199

200

// Extract old and new images for MODIFY events

201

if (dynamodb.has("OldImage")) {

202

record.setOldImage(convertDynamoDBImage(dynamodb.get("OldImage")));

203

}

204

if (dynamodb.has("NewImage")) {

205

record.setNewImage(convertDynamoDBImage(dynamodb.get("NewImage")));

206

}

207

208

return record;

209

}

210

211

private String extractAttributeValue(JsonNode attribute) {

212

// Handle DynamoDB attribute value format

213

if (attribute.has("S")) return attribute.get("S").asText();

214

if (attribute.has("N")) return attribute.get("N").asText();

215

if (attribute.has("B")) return attribute.get("B").asText();

216

return null;

217

}

218

219

private Map<String, Object> convertDynamoDBImage(JsonNode image) {

220

Map<String, Object> result = new HashMap<>();

221

image.fields().forEachRemaining(entry -> {

222

String key = entry.getKey();

223

JsonNode value = entry.getValue();

224

result.put(key, extractAttributeValue(value));

225

});

226

return result;

227

}

228

229

private String extractTableName(String streamArn) {

230

String[] parts = streamArn.split("/");

231

return parts.length >= 2 ? parts[1] : "unknown";

232

}

233

234

@Override

235

public TypeInformation<DynamoDBAnalyticsRecord> getProducedType() {

236

return TypeInformation.of(DynamoDBAnalyticsRecord.class);

237

}

238

}

239

240

// Create analytics pipeline

241

FlinkDynamoDBStreamsConsumer<DynamoDBAnalyticsRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(

242

dynamoStreamArn,

243

new AnalyticsDeserializer(),

244

props

245

);

246

247

DataStream<DynamoDBAnalyticsRecord> changes = env.addSource(consumer);

248

249

// Real-time aggregations

250

changes

251

.filter(record -> "MODIFY".equals(record.getEventType()))

252

.keyBy(DynamoDBAnalyticsRecord::getTableName)

253

.timeWindow(Time.minutes(5))

254

.aggregate(new ChangeCountAggregator())

255

.print();

256

```

257

258

### Change Data Replication

259

260

```java

261

public class ReplicationProcessor extends ProcessFunction<DynamoDBChangeRecord, Void> {

262

private transient DynamoDbClient targetDynamoDB;

263

264

@Override

265

public void open(Configuration parameters) throws Exception {

266

targetDynamoDB = DynamoDbClient.builder()

267

.region(Region.US_EAST_1) // Different region for replication

268

.build();

269

}

270

271

@Override

272

public void processElement(DynamoDBChangeRecord record, Context ctx, Collector<Void> out) throws Exception {

273

switch (record.getEventName()) {

274

case "INSERT":

275

replicateInsert(record);

276

break;

277

case "MODIFY":

278

replicateModify(record);

279

break;

280

case "REMOVE":

281

replicateRemove(record);

282

break;

283

}

284

}

285

286

private void replicateInsert(DynamoDBChangeRecord record) {

287

// Convert DynamoDB JSON to attribute values and insert

288

Map<String, AttributeValue> item = convertToAttributeValues(record.getDynamodb().get("NewImage"));

289

290

PutItemRequest request = PutItemRequest.builder()

291

.tableName(record.getTableName() + "-replica")

292

.item(item)

293

.build();

294

295

targetDynamoDB.putItem(request);

296

}

297

298

private void replicateModify(DynamoDBChangeRecord record) {

299

// Handle update operation

300

Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));

301

Map<String, AttributeValue> newImage = convertToAttributeValues(record.getDynamodb().get("NewImage"));

302

303

// Build update expression

304

UpdateItemRequest request = UpdateItemRequest.builder()

305

.tableName(record.getTableName() + "-replica")

306

.key(keys)

307

.attributeUpdates(buildUpdateActions(newImage))

308

.build();

309

310

targetDynamoDB.updateItem(request);

311

}

312

313

private void replicateRemove(DynamoDBChangeRecord record) {

314

Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));

315

316

DeleteItemRequest request = DeleteItemRequest.builder()

317

.tableName(record.getTableName() + "-replica")

318

.key(keys)

319

.build();

320

321

targetDynamoDB.deleteItem(request);

322

}

323

324

// Helper methods for converting between formats...

325

}

326

327

// Set up replication pipeline

328

changes.process(new ReplicationProcessor());

329

```

330

331

## Configuration Considerations

332

333

### DynamoDB-Specific Settings

334

335

```java

336

// DynamoDB Streams have different characteristics than Kinesis Data Streams

337

props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "1000"); // Lower batch size

338

props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "500"); // More frequent polling

339

props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000"); // Longer idle timeout

340

341

// DynamoDB Streams typically don't benefit from Enhanced Fan-Out

342

props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "POLLING");

343

```

344

345

### Error Handling for DynamoDB Streams

346

347

```java

348

public class RobustDynamoDBDeserializer implements KinesisDeserializationSchema<Either<DynamoDBChangeRecord, ErrorRecord>> {

349

350

@Override

351

public Either<DynamoDBChangeRecord, ErrorRecord> deserialize(byte[] recordValue, String partitionKey,

352

String seqNum, long approxArrivalTimestamp,

353

String stream, String shardId) {

354

try {

355

// Attempt normal deserialization

356

DynamoDBChangeRecord record = deserializeRecord(recordValue, stream);

357

return Either.left(record);

358

} catch (Exception e) {

359

// Create error record for poison messages

360

ErrorRecord error = new ErrorRecord();

361

error.setRawData(recordValue);

362

error.setStreamName(stream);

363

error.setShardId(shardId);

364

error.setSequenceNumber(seqNum);

365

error.setErrorMessage(e.getMessage());

366

error.setTimestamp(approxArrivalTimestamp);

367

368

return Either.right(error);

369

}

370

}

371

372

// Rest of implementation...

373

}

374

```

375

376

## Key Differences from Kinesis Data Streams

377

378

1. **Record Format**: DynamoDB Streams records contain structured change events with old/new images

379

2. **Shard Behavior**: DynamoDB manages shards automatically based on table partitioning

380

3. **Retention**: DynamoDB Streams have a fixed 24-hour retention period

381

4. **Throughput**: Lower throughput limits compared to Kinesis Data Streams

382

5. **Enhanced Fan-Out**: Not supported for DynamoDB Streams

383

6. **Stream ARN Format**: Uses DynamoDB table ARN format instead of Kinesis stream names