or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md
tile.json

dynamodb-streams.mddocs/

DynamoDB Streams Integration

Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality for change data capture.

Capabilities

FlinkDynamoDBStreamsConsumer

Specialized consumer that extends FlinkKinesisConsumer to work with DynamoDB Streams, providing the same exactly-once guarantees and configuration options while handling DynamoDB-specific stream characteristics.

public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
    
    /**
     * Create consumer for single DynamoDB stream with standard deserialization schema.
     *
     * @param stream DynamoDB stream ARN or name to consume from
     * @param deserializer Standard Flink deserialization schema
     * @param config AWS and consumer configuration properties
     */
    public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
    
    /**
     * Create consumer for multiple DynamoDB streams with Kinesis-specific deserialization schema.
     *
     * @param streams List of DynamoDB stream ARNs or names to consume from
     * @param deserializer Kinesis deserialization schema with metadata access
     * @param config AWS and consumer configuration properties
     */
    public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
}

Usage Examples

Basic DynamoDB Streams Consumer

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

// DynamoDB Stream ARN
String dynamoStreamArn = "arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/stream/2023-01-01T00:00:00.000";

FlinkDynamoDBStreamsConsumer<String> consumer = new FlinkDynamoDBStreamsConsumer<>(
    dynamoStreamArn,
    new SimpleStringSchema(),
    props
);

DataStream<String> stream = env.addSource(consumer);

DynamoDB Change Data Capture

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class DynamoDBChangeRecord {
    private String eventName;    // INSERT, MODIFY, REMOVE
    private String tableName;
    private JsonNode dynamodb;   // DynamoDB record data
    private long approximateCreationDateTime;
    
    // getters and setters...
}

public class DynamoDBStreamDeserializer implements KinesisDeserializationSchema<DynamoDBChangeRecord> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public DynamoDBChangeRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
                                          long approxArrivalTimestamp, String stream, String shardId) 
                                          throws IOException {
        String json = new String(recordValue, StandardCharsets.UTF_8);
        JsonNode root = objectMapper.readTree(json);
        
        DynamoDBChangeRecord changeRecord = new DynamoDBChangeRecord();
        changeRecord.setEventName(root.get("eventName").asText());
        changeRecord.setTableName(extractTableName(stream));
        changeRecord.setDynamodb(root.get("dynamodb"));
        changeRecord.setApproximateCreationDateTime(
            root.get("dynamodb").get("ApproximateCreationDateTime").asLong()
        );
        
        return changeRecord;
    }
    
    private String extractTableName(String streamArn) {
        // Extract table name from DynamoDB stream ARN
        // arn:aws:dynamodb:region:account:table/TableName/stream/timestamp
        String[] parts = streamArn.split("/");
        return parts.length >= 2 ? parts[1] : "unknown";
    }
    
    @Override
    public TypeInformation<DynamoDBChangeRecord> getProducedType() {
        return TypeInformation.of(DynamoDBChangeRecord.class);
    }
}

// Use the deserializer
FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
    dynamoStreamArn,
    new DynamoDBStreamDeserializer(),
    props
);

Multi-Table Change Streaming

import java.util.Arrays;
import java.util.List;

// Monitor multiple DynamoDB tables
List<String> tableStreams = Arrays.asList(
    "arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000",
    "arn:aws:dynamodb:us-west-2:123456789012:table/Orders/stream/2023-01-01T00:00:00.000",
    "arn:aws:dynamodb:us-west-2:123456789012:table/Products/stream/2023-01-01T00:00:00.000"
);

FlinkDynamoDBStreamsConsumer<DynamoDBChangeRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
    tableStreams,
    new DynamoDBStreamDeserializer(),
    props
);

DataStream<DynamoDBChangeRecord> changes = env.addSource(consumer);

// Process changes by table
changes
    .keyBy(record -> record.getTableName())
    .process(new TableSpecificChangeProcessor());

Real-Time Analytics Pipeline

public class DynamoDBAnalyticsRecord {
    private String tableName;
    private String eventType;
    private long timestamp;
    private Map<String, Object> oldImage;
    private Map<String, Object> newImage;
    private String partitionKey;
    private String sortKey;
    
    // getters and setters...
}

public class AnalyticsDeserializer implements KinesisDeserializationSchema<DynamoDBAnalyticsRecord> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public DynamoDBAnalyticsRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
                                             long approxArrivalTimestamp, String stream, String shardId) 
                                             throws IOException {
        String json = new String(recordValue, StandardCharsets.UTF_8);
        JsonNode root = objectMapper.readTree(json);
        JsonNode dynamodb = root.get("dynamodb");
        
        DynamoDBAnalyticsRecord record = new DynamoDBAnalyticsRecord();
        record.setTableName(extractTableName(stream));
        record.setEventType(root.get("eventName").asText());
        record.setTimestamp(dynamodb.get("ApproximateCreationDateTime").asLong());
        
        // Extract partition key and sort key
        JsonNode keys = dynamodb.get("Keys");
        if (keys != null) {
            record.setPartitionKey(extractAttributeValue(keys.get("pk")));
            if (keys.has("sk")) {
                record.setSortKey(extractAttributeValue(keys.get("sk")));
            }
        }
        
        // Extract old and new images for MODIFY events
        if (dynamodb.has("OldImage")) {
            record.setOldImage(convertDynamoDBImage(dynamodb.get("OldImage")));
        }
        if (dynamodb.has("NewImage")) {
            record.setNewImage(convertDynamoDBImage(dynamodb.get("NewImage")));
        }
        
        return record;
    }
    
    private String extractAttributeValue(JsonNode attribute) {
        // Handle DynamoDB attribute value format
        if (attribute.has("S")) return attribute.get("S").asText();
        if (attribute.has("N")) return attribute.get("N").asText();
        if (attribute.has("B")) return attribute.get("B").asText();
        return null;
    }
    
    private Map<String, Object> convertDynamoDBImage(JsonNode image) {
        Map<String, Object> result = new HashMap<>();
        image.fields().forEachRemaining(entry -> {
            String key = entry.getKey();
            JsonNode value = entry.getValue();
            result.put(key, extractAttributeValue(value));
        });
        return result;
    }
    
    private String extractTableName(String streamArn) {
        String[] parts = streamArn.split("/");
        return parts.length >= 2 ? parts[1] : "unknown";
    }
    
    @Override
    public TypeInformation<DynamoDBAnalyticsRecord> getProducedType() {
        return TypeInformation.of(DynamoDBAnalyticsRecord.class);
    }
}

// Create analytics pipeline
FlinkDynamoDBStreamsConsumer<DynamoDBAnalyticsRecord> consumer = new FlinkDynamoDBStreamsConsumer<>(
    dynamoStreamArn,
    new AnalyticsDeserializer(),
    props
);

DataStream<DynamoDBAnalyticsRecord> changes = env.addSource(consumer);

// Real-time aggregations
changes
    .filter(record -> "MODIFY".equals(record.getEventType()))
    .keyBy(DynamoDBAnalyticsRecord::getTableName)
    .timeWindow(Time.minutes(5))
    .aggregate(new ChangeCountAggregator())
    .print();

Change Data Replication

public class ReplicationProcessor extends ProcessFunction<DynamoDBChangeRecord, Void> {
    private transient DynamoDbClient targetDynamoDB;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        targetDynamoDB = DynamoDbClient.builder()
            .region(Region.US_EAST_1) // Different region for replication
            .build();
    }
    
    @Override
    public void processElement(DynamoDBChangeRecord record, Context ctx, Collector<Void> out) throws Exception {
        switch (record.getEventName()) {
            case "INSERT":
                replicateInsert(record);
                break;
            case "MODIFY":
                replicateModify(record);
                break;
            case "REMOVE":
                replicateRemove(record);
                break;
        }
    }
    
    private void replicateInsert(DynamoDBChangeRecord record) {
        // Convert DynamoDB JSON to attribute values and insert
        Map<String, AttributeValue> item = convertToAttributeValues(record.getDynamodb().get("NewImage"));
        
        PutItemRequest request = PutItemRequest.builder()
            .tableName(record.getTableName() + "-replica")
            .item(item)
            .build();
            
        targetDynamoDB.putItem(request);
    }
    
    private void replicateModify(DynamoDBChangeRecord record) {
        // Handle update operation
        Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
        Map<String, AttributeValue> newImage = convertToAttributeValues(record.getDynamodb().get("NewImage"));
        
        // Build update expression
        UpdateItemRequest request = UpdateItemRequest.builder()
            .tableName(record.getTableName() + "-replica")
            .key(keys)
            .attributeUpdates(buildUpdateActions(newImage))
            .build();
            
        targetDynamoDB.updateItem(request);
    }
    
    private void replicateRemove(DynamoDBChangeRecord record) {
        Map<String, AttributeValue> keys = convertToAttributeValues(record.getDynamodb().get("Keys"));
        
        DeleteItemRequest request = DeleteItemRequest.builder()
            .tableName(record.getTableName() + "-replica")
            .key(keys)
            .build();
            
        targetDynamoDB.deleteItem(request);
    }
    
    // Helper methods for converting between formats...
}

// Set up replication pipeline
changes.process(new ReplicationProcessor());

Configuration Considerations

DynamoDB-Specific Settings

// DynamoDB Streams have different characteristics than Kinesis Data Streams
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "1000"); // Lower batch size
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "500"); // More frequent polling
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000"); // Longer idle timeout

// DynamoDB Streams typically don't benefit from Enhanced Fan-Out
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "POLLING");

Error Handling for DynamoDB Streams

public class RobustDynamoDBDeserializer implements KinesisDeserializationSchema<Either<DynamoDBChangeRecord, ErrorRecord>> {
    
    @Override
    public Either<DynamoDBChangeRecord, ErrorRecord> deserialize(byte[] recordValue, String partitionKey, 
                                                               String seqNum, long approxArrivalTimestamp, 
                                                               String stream, String shardId) {
        try {
            // Attempt normal deserialization
            DynamoDBChangeRecord record = deserializeRecord(recordValue, stream);
            return Either.left(record);
        } catch (Exception e) {
            // Create error record for poison messages
            ErrorRecord error = new ErrorRecord();
            error.setRawData(recordValue);
            error.setStreamName(stream);
            error.setShardId(shardId);
            error.setSequenceNumber(seqNum);
            error.setErrorMessage(e.getMessage());
            error.setTimestamp(approxArrivalTimestamp);
            
            return Either.right(error);
        }
    }
    
    // Rest of implementation...
}

Key Differences from Kinesis Data Streams

  1. Record Format: DynamoDB Streams records contain structured change events with old/new images
  2. Shard Behavior: DynamoDB manages shards automatically based on table partitioning
  3. Retention: DynamoDB Streams have a fixed 24-hour retention period
  4. Throughput: Lower throughput limits compared to Kinesis Data Streams
  5. Enhanced Fan-Out: Not supported for DynamoDB Streams
  6. Stream ARN Format: Uses DynamoDB table ARN format instead of Kinesis stream names