0
# DynamoDB Streams Integration
1
2
Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality for change data capture.
3
4
## Capabilities
5
6
### FlinkDynamoDBStreamsConsumer
7
8
Specialized consumer that extends FlinkKinesisConsumer to work with DynamoDB Streams, providing the same exactly-once guarantees and configuration options while handling DynamoDB-specific stream characteristics.
9
10
```java { .api }
11
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
12
13
/**
14
* Create consumer for single DynamoDB stream with standard deserialization schema.
15
*
16
* @param stream DynamoDB stream ARN or name to consume from
17
* @param deserializer Standard Flink deserialization schema
18
* @param config AWS and consumer configuration properties
19
*/
20
public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
21
22
/**
23
* Create consumer for multiple DynamoDB streams with Kinesis-specific deserialization schema.
24
*
25
* @param streams List of DynamoDB stream ARNs or names to consume from
26
* @param deserializer Kinesis deserialization schema with metadata access
27
* @param config AWS and consumer configuration properties
28
*/
29
public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
30
}
31
```
32
33
## Usage Examples
34
35
### Basic DynamoDB Streams Consumer
36
37
```java
38
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
39
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;
40
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
41
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
42
import org.apache.flink.api.common.serialization.SimpleStringSchema;
43
import java.util.Properties;
44
45
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
46
47
Properties props = new Properties();
48
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
49
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
50
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
51
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
52
53
// DynamoDB Stream ARN
54
String dynamoStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/stream/2023-01-01T00:00:00.000";
55
56
FlinkDynamoDBStreamsConsumer<String> consumer = new FlinkDynamoDBStreamsConsumer<>(
57
dynamoStreamArn,
58
new SimpleStringSchema(),
59
props
60
);
61
62
DataStream<String> stream = env.addSource(consumer);
63
```
64
65
### DynamoDB Change Data Capture
66
67
```java
68
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
69
import com.fasterxml.jackson.databind.JsonNode;
70
import com.fasterxml.jackson.databind.ObjectMapper;
71
72
public class DynamoDBChangeRecord {
73
private String eventName; // INSERT, MODIFY, REMOVE
74
private String tableName;
75
private JsonNode dynamodb; // DynamoDB record data
76
private long approximateCreationDateTime;
77
78
// getters and setters...
79
}
80
81
public class DynamoDBStreamDeserializer implements KinesisDeserializationSchema<DynamoDBChangeRecord> {
82
private transient ObjectMapper objectMapper;
83
84
@Override
85
public void open(DeserializationSchema.InitializationContext context) throws Exception {
86
objectMapper = new ObjectMapper();
87
}
88
89
@Override
90
public DynamoDBChangeRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
91
long approxArrivalTimestamp, String stream, String shardId)
92
throws IOException {
93
String json = new String(recordValue, StandardCharsets.UTF_8);
94
JsonNode root = objectMapper.readTree(json);
95
96
DynamoDBChangeRecord changeRecord = new DynamoDBChangeRecord();
97
changeRecord.setEventName(root.get("eventName").asText());
98
changeRecord.setTableName(extractTableName(stream));
99
changeRecord.setDynamodb(root.get("dynamodb"));
100
changeRecord.setApproximateCreationDateTime(
101
root.get("dynamodb").get("ApproximateCreationDateTime").asLong()
102
);
103
104
return changeRecord;
105
}
106
107
private String extractTableName(String streamArn) {
108
// Extract table name from DynamoDB stream ARN
109
// arn:aws:dynamodb:region:account:table/TableName/stream/timestamp
110
String[] parts = streamArn.split("/");
111
return parts.length >= 2 ? parts[1] : "unknown";
112
}
113
114
@Override
115
public TypeInformation<DynamoDBChangeRecord> getProducedType() {
116
return TypeInformation.of(DynamoDBChangeRecord.class);
117
}
118
}
119
120
// Use the deserializer
121
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
122
dynamoStreamArn,
123
new DynamoDBStreamDeserializer(),
124
props
125
);
126
```
127
128
### Multi-Table Change Streaming
129
130
```java
131
import java.util.Arrays;
132
import java.util.List;
133
134
// Monitor multiple DynamoDB tables
135
List<String> tableStreams = Arrays.asList(
136
"arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000",
137
"arn:aws:dynamodb:us-west-2:123456789012:table/Orders/stream/2023-01-01T00:00:00.000",
138
"arn:aws:dynamodb:us-west-2:123456789012:table/Products/stream/2023-01-01T00:00:00.000"
139
);
140
141
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
142
tableStreams,
143
new DynamoDBStreamDeserializer(),
144
props
145
);
146
147
DataStream<DynamoDBChangeRecord> changes = env.addSource(consumer);
148
149
// Process changes by table
150
changes
151
.keyBy(record -> record.getTableName())
152
.process(new TableSpecificChangeProcessor());
153
```
154
155
### Real-Time Analytics Pipeline
156
157
```java
158
public class DynamoDBAnalyticsRecord {
159
private String tableName;
160
private String eventType;
161
private long timestamp;
162
private Map<String, Object> oldImage;
163
private Map<String, Object> newImage;
164
private String partitionKey;
165
private String sortKey;
166
167
// getters and setters...
168
}
169
170
public class AnalyticsDeserializer implements KinesisDeserializationSchema<DynamoDBAnalyticsRecord> {
171
private transient ObjectMapper objectMapper;
172
173
@Override
174
public void open(DeserializationSchema.InitializationContext context) throws Exception {
175
objectMapper = new ObjectMapper();
176
}
177
178
@Override
179
public DynamoDBAnalyticsRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
180
long approxArrivalTimestamp, String stream, String shardId)
181
throws IOException {
182
String json = new String(recordValue, StandardCharsets.UTF_8);
183
JsonNode root = objectMapper.readTree(json);
184
JsonNode dynamodb = root.get("dynamodb");
185
186
DynamoDBAnalyticsRecord record = new DynamoDBAnalyticsRecord();
187
record.setTableName(extractTableName(stream));
188
record.setEventType(root.get("eventName").asText());
189
record.setTimestamp(dynamodb.get("ApproximateCreationDateTime").asLong());
190
191
// Extract partition key and sort key
192
JsonNode keys = dynamodb.get("Keys");
193
if (keys != null) {
194
record.setPartitionKey(extractAttributeValue(keys.get("pk")));
195
if (keys.has("sk")) {
196
record.setSortKey(extractAttributeValue(keys.get("sk")));
197
}
198
}
199
200
// Extract old and new images for MODIFY events
201
if (dynamodb.has("OldImage")) {
202
record.setOldImage(convertDynamoDBImage(dynamodb.get("OldImage")));
203
}
204
if (dynamodb.has("NewImage")) {
205
record.setNewImage(convertDynamoDBImage(dynamodb.get("NewImage")));
206
}
207
208
return record;
209
}
210
211
private String extractAttributeValue(JsonNode attribute) {
212
// Handle DynamoDB attribute value format
213
if (attribute.has("S")) return attribute.get("S").asText();
214
if (attribute.has("N")) return attribute.get("N").asText();
215
if (attribute.has("B")) return attribute.get("B").asText();
216
return null;
217
}
218
219
private Map<String, Object> convertDynamoDBImage(JsonNode image) {
220
Map<String, Object> result = new HashMap<>();
221
image.fields().forEachRemaining(entry -> {
222
String key = entry.getKey();
223
JsonNode value = entry.getValue();
224
result.put(key, extractAttributeValue(value));
225
});
226
return result;
227
}
228
229
private String extractTableName(String streamArn) {
230
String[] parts = streamArn.split("/");
231
return parts.length >= 2 ? parts[1] : "unknown";
232
}
233
234
@Override
235
public TypeInformation<DynamoDBAnalyticsRecord> getProducedType() {
236
return TypeInformation.of(DynamoDBAnalyticsRecord.class);
237
}
238
}
239
240
// Create analytics pipeline
241
FlinkDynamoDBStreamsConsumer<DynamoDBAnalyticsRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
242
dynamoStreamArn,
243
new AnalyticsDeserializer(),
244
props
245
);
246
247
DataStream<DynamoDBAnalyticsRecord> changes = env.addSource(consumer);
248
249
// Real-time aggregations
250
changes
251
.filter(record -> "MODIFY".equals(record.getEventType()))
252
.keyBy(DynamoDBAnalyticsRecord::getTableName)
253
.timeWindow(Time.minutes(5))
254
.aggregate(new ChangeCountAggregator())
255
.print();
256
```
257
258
### Change Data Replication
259
260
```java
261
public class ReplicationProcessor extends ProcessFunction<DynamoDBChangeRecord, Void> {
262
private transient DynamoDbClient targetDynamoDB;
263
264
@Override
265
public void open(Configuration parameters) throws Exception {
266
targetDynamoDB = DynamoDbClient.builder()
267
.region(Region.US_EAST_1) // Different region for replication
268
.build();
269
}
270
271
@Override
272
public void processElement(DynamoDBChangeRecord record, Context ctx, Collector<Void> out) throws Exception {
273
switch (record.getEventName()) {
274
case "INSERT":
275
replicateInsert(record);
276
break;
277
case "MODIFY":
278
replicateModify(record);
279
break;
280
case "REMOVE":
281
replicateRemove(record);
282
break;
283
}
284
}
285
286
private void replicateInsert(DynamoDBChangeRecord record) {
287
// Convert DynamoDB JSON to attribute values and insert
288
Map<String, AttributeValue> item = convertToAttributeValues(record.getDynamodb().get("NewImage"));
289
290
PutItemRequest request = PutItemRequest.builder()
291
.tableName(record.getTableName() + "-replica")
292
.item(item)
293
.build();
294
295
targetDynamoDB.putItem(request);
296
}
297
298
private void replicateModify(DynamoDBChangeRecord record) {
299
// Handle update operation
300
Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
301
Map<String, AttributeValue> newImage = convertToAttributeValues(record.getDynamodb().get("NewImage"));
302
303
// Build update expression
304
UpdateItemRequest request = UpdateItemRequest.builder()
305
.tableName(record.getTableName() + "-replica")
306
.key(keys)
307
.attributeUpdates(buildUpdateActions(newImage))
308
.build();
309
310
targetDynamoDB.updateItem(request);
311
}
312
313
private void replicateRemove(DynamoDBChangeRecord record) {
314
Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
315
316
DeleteItemRequest request = DeleteItemRequest.builder()
317
.tableName(record.getTableName() + "-replica")
318
.key(keys)
319
.build();
320
321
targetDynamoDB.deleteItem(request);
322
}
323
324
// Helper methods for converting between formats...
325
}
326
327
// Set up replication pipeline
328
changes.process(new ReplicationProcessor());
329
```
330
331
## Configuration Considerations
332
333
### DynamoDB-Specific Settings
334
335
```java
336
// DynamoDB Streams have different characteristics than Kinesis Data Streams
337
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "1000"); // Lower batch size
338
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "500"); // More frequent polling
339
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000"); // Longer idle timeout
340
341
// DynamoDB Streams typically don't benefit from Enhanced Fan-Out
342
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "POLLING");
343
```
344
345
### Error Handling for DynamoDB Streams
346
347
```java
348
public class RobustDynamoDBDeserializer implements KinesisDeserializationSchema<Either<DynamoDBChangeRecord, ErrorRecord>> {
349
350
@Override
351
public Either<DynamoDBChangeRecord, ErrorRecord> deserialize(byte[] recordValue, String partitionKey,
352
String seqNum, long approxArrivalTimestamp,
353
String stream, String shardId) {
354
try {
355
// Attempt normal deserialization
356
DynamoDBChangeRecord record = deserializeRecord(recordValue, stream);
357
return Either.left(record);
358
} catch (Exception e) {
359
// Create error record for poison messages
360
ErrorRecord error = new ErrorRecord();
361
error.setRawData(recordValue);
362
error.setStreamName(stream);
363
error.setShardId(shardId);
364
error.setSequenceNumber(seqNum);
365
error.setErrorMessage(e.getMessage());
366
error.setTimestamp(approxArrivalTimestamp);
367
368
return Either.right(error);
369
}
370
}
371
372
// Rest of implementation...
373
}
374
```
375
376
## Key Differences from Kinesis Data Streams
377
378
1. **Record Format**: DynamoDB Streams records contain structured change events with old/new images
379
2. **Shard Behavior**: DynamoDB manages shards automatically based on table partitioning
380
3. **Retention**: DynamoDB Streams have a fixed 24-hour retention period
381
4. **Throughput**: Lower throughput limits compared to Kinesis Data Streams
382
5. **Enhanced Fan-Out**: Not supported for DynamoDB Streams
383
6. **Stream ARN Format**: Uses DynamoDB table ARN format instead of Kinesis stream names