Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services
Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality for change data capture.
Specialized consumer that extends FlinkKinesisConsumer to work with DynamoDB Streams, providing the same exactly-once guarantees and configuration options while handling DynamoDB-specific stream characteristics.
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
/**
* Create consumer for single DynamoDB stream with standard deserialization schema.
*
* @param stream DynamoDB stream ARN or name to consume from
* @param deserializer Standard Flink deserialization schema
* @param config AWS and consumer configuration properties
*/
public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
/**
* Create consumer for multiple DynamoDB streams with Kinesis-specific deserialization schema.
*
* @param streams List of DynamoDB stream ARNs or names to consume from
* @param deserializer Kinesis deserialization schema with metadata access
* @param config AWS and consumer configuration properties
*/
public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
// DynamoDB Stream ARN
String dynamoStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/stream/2023-01-01T00:00:00.000";
FlinkDynamoDBStreamsConsumer<String> consumer = new FlinkDynamoDBStreamsConsumer<>(
dynamoStreamArn,
new SimpleStringSchema(),
props
);
DataStream<String> stream = env.addSource(consumer);import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class DynamoDBChangeRecord {
private String eventName; // INSERT, MODIFY, REMOVE
private String tableName;
private JsonNode dynamodb; // DynamoDB record data
private long approximateCreationDateTime;
// getters and setters...
}
public class DynamoDBStreamDeserializer implements KinesisDeserializationSchema<DynamoDBChangeRecord> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public DynamoDBChangeRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
String json = new String(recordValue, StandardCharsets.UTF_8);
JsonNode root = objectMapper.readTree(json);
DynamoDBChangeRecord changeRecord = new DynamoDBChangeRecord();
changeRecord.setEventName(root.get("eventName").asText());
changeRecord.setTableName(extractTableName(stream));
changeRecord.setDynamodb(root.get("dynamodb"));
changeRecord.setApproximateCreationDateTime(
root.get("dynamodb").get("ApproximateCreationDateTime").asLong()
);
return changeRecord;
}
private String extractTableName(String streamArn) {
// Extract table name from DynamoDB stream ARN
// arn:aws:dynamodb:region:account:table/TableName/stream/timestamp
String[] parts = streamArn.split("/");
return parts.length >= 2 ? parts[1] : "unknown";
}
@Override
public TypeInformation<DynamoDBChangeRecord> getProducedType() {
return TypeInformation.of(DynamoDBChangeRecord.class);
}
}
// Use the deserializer
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
dynamoStreamArn,
new DynamoDBStreamDeserializer(),
props
);import java.util.Arrays;
import java.util.List;
// Monitor multiple DynamoDB tables
List<String> tableStreams = Arrays.asList(
"arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000",
"arn:aws:dynamodb:us-west-2:123456789012:table/Orders/stream/2023-01-01T00:00:00.000",
"arn:aws:dynamodb:us-west-2:123456789012:table/Products/stream/2023-01-01T00:00:00.000"
);
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
tableStreams,
new DynamoDBStreamDeserializer(),
props
);
DataStream<DynamoDBChangeRecord> changes = env.addSource(consumer);
// Process changes by table
changes
.keyBy(record -> record.getTableName())
.process(new TableSpecificChangeProcessor());public class DynamoDBAnalyticsRecord {
private String tableName;
private String eventType;
private long timestamp;
private Map<String, Object> oldImage;
private Map<String, Object> newImage;
private String partitionKey;
private String sortKey;
// getters and setters...
}
public class AnalyticsDeserializer implements KinesisDeserializationSchema<DynamoDBAnalyticsRecord> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public DynamoDBAnalyticsRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
String json = new String(recordValue, StandardCharsets.UTF_8);
JsonNode root = objectMapper.readTree(json);
JsonNode dynamodb = root.get("dynamodb");
DynamoDBAnalyticsRecord record = new DynamoDBAnalyticsRecord();
record.setTableName(extractTableName(stream));
record.setEventType(root.get("eventName").asText());
record.setTimestamp(dynamodb.get("ApproximateCreationDateTime").asLong());
// Extract partition key and sort key
JsonNode keys = dynamodb.get("Keys");
if (keys != null) {
record.setPartitionKey(extractAttributeValue(keys.get("pk")));
if (keys.has("sk")) {
record.setSortKey(extractAttributeValue(keys.get("sk")));
}
}
// Extract old and new images for MODIFY events
if (dynamodb.has("OldImage")) {
record.setOldImage(convertDynamoDBImage(dynamodb.get("OldImage")));
}
if (dynamodb.has("NewImage")) {
record.setNewImage(convertDynamoDBImage(dynamodb.get("NewImage")));
}
return record;
}
private String extractAttributeValue(JsonNode attribute) {
// Handle DynamoDB attribute value format
if (attribute.has("S")) return attribute.get("S").asText();
if (attribute.has("N")) return attribute.get("N").asText();
if (attribute.has("B")) return attribute.get("B").asText();
return null;
}
private Map<String, Object> convertDynamoDBImage(JsonNode image) {
Map<String, Object> result = new HashMap<>();
image.fields().forEachRemaining(entry -> {
String key = entry.getKey();
JsonNode value = entry.getValue();
result.put(key, extractAttributeValue(value));
});
return result;
}
private String extractTableName(String streamArn) {
String[] parts = streamArn.split("/");
return parts.length >= 2 ? parts[1] : "unknown";
}
@Override
public TypeInformation<DynamoDBAnalyticsRecord> getProducedType() {
return TypeInformation.of(DynamoDBAnalyticsRecord.class);
}
}
// Create analytics pipeline
FlinkDynamoDBStreamsConsumer<DynamoDBAnalyticsRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
dynamoStreamArn,
new AnalyticsDeserializer(),
props
);
DataStream<DynamoDBAnalyticsRecord> changes = env.addSource(consumer);
// Real-time aggregations
changes
.filter(record -> "MODIFY".equals(record.getEventType()))
.keyBy(DynamoDBAnalyticsRecord::getTableName)
.timeWindow(Time.minutes(5))
.aggregate(new ChangeCountAggregator())
.print();public class ReplicationProcessor extends ProcessFunction<DynamoDBChangeRecord, Void> {
private transient DynamoDbClient targetDynamoDB;
@Override
public void open(Configuration parameters) throws Exception {
targetDynamoDB = DynamoDbClient.builder()
.region(Region.US_EAST_1) // Different region for replication
.build();
}
@Override
public void processElement(DynamoDBChangeRecord record, Context ctx, Collector<Void> out) throws Exception {
switch (record.getEventName()) {
case "INSERT":
replicateInsert(record);
break;
case "MODIFY":
replicateModify(record);
break;
case "REMOVE":
replicateRemove(record);
break;
}
}
private void replicateInsert(DynamoDBChangeRecord record) {
// Convert DynamoDB JSON to attribute values and insert
Map<String, AttributeValue> item = convertToAttributeValues(record.getDynamodb().get("NewImage"));
PutItemRequest request = PutItemRequest.builder()
.tableName(record.getTableName() + "-replica")
.item(item)
.build();
targetDynamoDB.putItem(request);
}
private void replicateModify(DynamoDBChangeRecord record) {
// Handle update operation
Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
Map<String, AttributeValue> newImage = convertToAttributeValues(record.getDynamodb().get("NewImage"));
// Build update expression
UpdateItemRequest request = UpdateItemRequest.builder()
.tableName(record.getTableName() + "-replica")
.key(keys)
.attributeUpdates(buildUpdateActions(newImage))
.build();
targetDynamoDB.updateItem(request);
}
private void replicateRemove(DynamoDBChangeRecord record) {
Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
DeleteItemRequest request = DeleteItemRequest.builder()
.tableName(record.getTableName() + "-replica")
.key(keys)
.build();
targetDynamoDB.deleteItem(request);
}
// Helper methods for converting between formats...
}
// Set up replication pipeline
changes.process(new ReplicationProcessor());// DynamoDB Streams have different characteristics than Kinesis Data Streams
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "1000"); // Lower batch size
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "500"); // More frequent polling
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000"); // Longer idle timeout
// DynamoDB Streams typically don't benefit from Enhanced Fan-Out
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "POLLING");public class RobustDynamoDBDeserializer implements KinesisDeserializationSchema<Either<DynamoDBChangeRecord, ErrorRecord>> {
@Override
public Either<DynamoDBChangeRecord, ErrorRecord> deserialize(byte[] recordValue, String partitionKey,
String seqNum, long approxArrivalTimestamp,
String stream, String shardId) {
try {
// Attempt normal deserialization
DynamoDBChangeRecord record = deserializeRecord(recordValue, stream);
return Either.left(record);
} catch (Exception e) {
// Create error record for poison messages
ErrorRecord error = new ErrorRecord();
error.setRawData(recordValue);
error.setStreamName(stream);
error.setShardId(shardId);
error.setSequenceNumber(seqNum);
error.setErrorMessage(e.getMessage());
error.setTimestamp(approxArrivalTimestamp);
return Either.right(error);
}
}
// Rest of implementation...
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kinesis-2-11