Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality for change data capture.
Specialized consumer that extends FlinkKinesisConsumer to work with DynamoDB Streams, providing the same exactly-once guarantees and configuration options while handling DynamoDB-specific stream characteristics.
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
/**
* Create consumer for single DynamoDB stream with standard deserialization schema.
*
* @param stream DynamoDB stream ARN or name to consume from
* @param deserializer Standard Flink deserialization schema
* @param config AWS and consumer configuration properties
*/
public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
/**
* Create consumer for multiple DynamoDB streams with Kinesis-specific deserialization schema.
*
* @param streams List of DynamoDB stream ARNs or names to consume from
* @param deserializer Kinesis deserialization schema with metadata access
* @param config AWS and consumer configuration properties
*/
public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
// DynamoDB Stream ARN
String dynamoStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/stream/2023-01-01T00:00:00.000";
FlinkDynamoDBStreamsConsumer<String> consumer = new FlinkDynamoDBStreamsConsumer<>(
dynamoStreamArn,
new SimpleStringSchema(),
props
);
DataStream<String> stream = env.addSource(consumer);import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class DynamoDBChangeRecord {
private String eventName; // INSERT, MODIFY, REMOVE
private String tableName;
private JsonNode dynamodb; // DynamoDB record data
private long approximateCreationDateTime;
// getters and setters...
}
public class DynamoDBStreamDeserializer implements KinesisDeserializationSchema<DynamoDBChangeRecord> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public DynamoDBChangeRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
String json = new String(recordValue, StandardCharsets.UTF_8);
JsonNode root = objectMapper.readTree(json);
DynamoDBChangeRecord changeRecord = new DynamoDBChangeRecord();
changeRecord.setEventName(root.get("eventName").asText());
changeRecord.setTableName(extractTableName(stream));
changeRecord.setDynamodb(root.get("dynamodb"));
changeRecord.setApproximateCreationDateTime(
root.get("dynamodb").get("ApproximateCreationDateTime").asLong()
);
return changeRecord;
}
private String extractTableName(String streamArn) {
// Extract table name from DynamoDB stream ARN
// arn:aws:dynamodb:region:account:table/TableName/stream/timestamp
String[] parts = streamArn.split("/");
return parts.length >= 2 ? parts[1] : "unknown";
}
@Override
public TypeInformation<DynamoDBChangeRecord> getProducedType() {
return TypeInformation.of(DynamoDBChangeRecord.class);
}
}
// Use the deserializer
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
dynamoStreamArn,
new DynamoDBStreamDeserializer(),
props
);import java.util.Arrays;
import java.util.List;
// Monitor multiple DynamoDB tables
List<String> tableStreams = Arrays.asList(
"arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000",
"arn:aws:dynamodb:us-west-2:123456789012:table/Orders/stream/2023-01-01T00:00:00.000",
"arn:aws:dynamodb:us-west-2:123456789012:table/Products/stream/2023-01-01T00:00:00.000"
);
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
tableStreams,
new DynamoDBStreamDeserializer(),
props
);
DataStream<DynamoDBChangeRecord> changes = env.addSource(consumer);
// Process changes by table
changes
.keyBy(record -> record.getTableName())
.process(new TableSpecificChangeProcessor());public class DynamoDBAnalyticsRecord {
private String tableName;
private String eventType;
private long timestamp;
private Map<String, Object> oldImage;
private Map<String, Object> newImage;
private String partitionKey;
private String sortKey;
// getters and setters...
}
public class AnalyticsDeserializer implements KinesisDeserializationSchema<DynamoDBAnalyticsRecord> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public DynamoDBAnalyticsRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
String json = new String(recordValue, StandardCharsets.UTF_8);
JsonNode root = objectMapper.readTree(json);
JsonNode dynamodb = root.get("dynamodb");
DynamoDBAnalyticsRecord record = new DynamoDBAnalyticsRecord();
record.setTableName(extractTableName(stream));
record.setEventType(root.get("eventName").asText());
record.setTimestamp(dynamodb.get("ApproximateCreationDateTime").asLong());
// Extract partition key and sort key
JsonNode keys = dynamodb.get("Keys");
if (keys != null) {
record.setPartitionKey(extractAttributeValue(keys.get("pk")));
if (keys.has("sk")) {
record.setSortKey(extractAttributeValue(keys.get("sk")));
}
}
// Extract old and new images for MODIFY events
if (dynamodb.has("OldImage")) {
record.setOldImage(convertDynamoDBImage(dynamodb.get("OldImage")));
}
if (dynamodb.has("NewImage")) {
record.setNewImage(convertDynamoDBImage(dynamodb.get("NewImage")));
}
return record;
}
private String extractAttributeValue(JsonNode attribute) {
// Handle DynamoDB attribute value format
if (attribute.has("S")) return attribute.get("S").asText();
if (attribute.has("N")) return attribute.get("N").asText();
if (attribute.has("B")) return attribute.get("B").asText();
return null;
}
private Map<String, Object> convertDynamoDBImage(JsonNode image) {
Map<String, Object> result = new HashMap<>();
image.fields().forEachRemaining(entry -> {
String key = entry.getKey();
JsonNode value = entry.getValue();
result.put(key, extractAttributeValue(value));
});
return result;
}
private String extractTableName(String streamArn) {
String[] parts = streamArn.split("/");
return parts.length >= 2 ? parts[1] : "unknown";
}
@Override
public TypeInformation<DynamoDBAnalyticsRecord> getProducedType() {
return TypeInformation.of(DynamoDBAnalyticsRecord.class);
}
}
// Create analytics pipeline
FlinkDynamoDBStreamsConsumer<DynamoDBAnalyticsRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
dynamoStreamArn,
new AnalyticsDeserializer(),
props
);
DataStream<DynamoDBAnalyticsRecord> changes = env.addSource(consumer);
// Real-time aggregations
changes
.filter(record -> "MODIFY".equals(record.getEventType()))
.keyBy(DynamoDBAnalyticsRecord::getTableName)
.timeWindow(Time.minutes(5))
.aggregate(new ChangeCountAggregator())
.print();public class ReplicationProcessor extends ProcessFunction<DynamoDBChangeRecord, Void> {
private transient DynamoDbClient targetDynamoDB;
@Override
public void open(Configuration parameters) throws Exception {
targetDynamoDB = DynamoDbClient.builder()
.region(Region.US_EAST_1) // Different region for replication
.build();
}
@Override
public void processElement(DynamoDBChangeRecord record, Context ctx, Collector<Void> out) throws Exception {
switch (record.getEventName()) {
case "INSERT":
replicateInsert(record);
break;
case "MODIFY":
replicateModify(record);
break;
case "REMOVE":
replicateRemove(record);
break;
}
}
private void replicateInsert(DynamoDBChangeRecord record) {
// Convert DynamoDB JSON to attribute values and insert
Map<String, AttributeValue> item = convertToAttributeValues(record.getDynamodb().get("NewImage"));
PutItemRequest request = PutItemRequest.builder()
.tableName(record.getTableName() + "-replica")
.item(item)
.build();
targetDynamoDB.putItem(request);
}
private void replicateModify(DynamoDBChangeRecord record) {
// Handle update operation
Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
Map<String, AttributeValue> newImage = convertToAttributeValues(record.getDynamodb().get("NewImage"));
// Build update expression
UpdateItemRequest request = UpdateItemRequest.builder()
.tableName(record.getTableName() + "-replica")
.key(keys)
.attributeUpdates(buildUpdateActions(newImage))
.build();
targetDynamoDB.updateItem(request);
}
private void replicateRemove(DynamoDBChangeRecord record) {
Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
DeleteItemRequest request = DeleteItemRequest.builder()
.tableName(record.getTableName() + "-replica")
.key(keys)
.build();
targetDynamoDB.deleteItem(request);
}
// Helper methods for converting between formats...
}
// Set up replication pipeline
changes.process(new ReplicationProcessor());// DynamoDB Streams have different characteristics than Kinesis Data Streams
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "1000"); // Lower batch size
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "500"); // More frequent polling
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000"); // Longer idle timeout
// DynamoDB Streams typically don't benefit from Enhanced Fan-Out
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "POLLING");public class RobustDynamoDBDeserializer implements KinesisDeserializationSchema<Either<DynamoDBChangeRecord, ErrorRecord>> {
@Override
public Either<DynamoDBChangeRecord, ErrorRecord> deserialize(byte[] recordValue, String partitionKey,
String seqNum, long approxArrivalTimestamp,
String stream, String shardId) {
try {
// Attempt normal deserialization
DynamoDBChangeRecord record = deserializeRecord(recordValue, stream);
return Either.left(record);
} catch (Exception e) {
// Create error record for poison messages
ErrorRecord error = new ErrorRecord();
error.setRawData(recordValue);
error.setStreamName(stream);
error.setShardId(shardId);
error.setSequenceNumber(seqNum);
error.setErrorMessage(e.getMessage());
error.setTimestamp(approxArrivalTimestamp);
return Either.right(error);
}
}
// Rest of implementation...
}