Ctrl + k

or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.pulumi/aws@7.16.x

docs

guides

data-pipeline.mdsecurity-best-practices.mdweb-application.md
common-patterns.mdgetting-started.mdindex.mdprovider.md
tile.json

tessl/maven-com-pulumi--aws

tessl install tessl/maven-com-pulumi--aws@7.16.0

Pulumi Java SDK for AWS providing strongly-typed Infrastructure-as-Code for 227 AWS service packages including compute, storage, databases, networking, security, analytics, machine learning, and more.

data-pipeline.mddocs/guides/

Building a Serverless Data Processing Pipeline

This guide demonstrates how to build a production-ready, event-driven data processing pipeline on AWS using serverless technologies. The architecture handles data ingestion, transformation, storage, and monitoring.

Architecture Overview

┌─────────────┐
│   Source    │
│   Systems   │
└──────┬──────┘
       │
       │ PUT Object
       ▼
┌─────────────────────────────────────────────────────┐
│                    S3 Input Bucket                  │
│              (Raw Data / Landing Zone)              │
└──────┬──────────────────────────────────────────────┘
       │
       │ S3 Event Notification
       ▼
┌─────────────────────────────────────────────────────┐
│                     SNS Topic                       │
│                (Event Fan-Out)                      │
└──────┬──────────────────┬──────────────────────────┘
       │                  │
       │                  │ Subscribe
       │                  ▼
       │         ┌──────────────────┐
       │         │   SQS Queue DLQ  │
       │         └──────────────────┘
       │                  ▲
       │ Subscribe        │ Failed Messages
       ▼                  │
┌─────────────────────────┴───────────────────────────┐
│                    SQS Queue                        │
│           (Buffering & Throttling)                  │
└──────┬──────────────────────────────────────────────┘
       │
       │ Event Source Mapping
       ▼
┌─────────────────────────────────────────────────────┐
│               Lambda Processor                      │
│          (Transform & Validate Data)                │
└──────┬────────────────────┬─────────────────────────┘
       │                    │
       │ Success            │ Failed Records
       ▼                    ▼
┌──────────────┐    ┌──────────────────┐
│      S3      │    │   S3 Failed      │
│  Processed   │    │   Bucket         │
└──────┬───────┘    └──────────────────┘
       │
       │ Store Results
       ▼
┌─────────────────────────────────────────────────────┐
│                  DynamoDB Table                     │
│              (Processed Records)                    │
└─────────────────────────────────────────────────────┘
       │
       │ DynamoDB Stream
       ▼
┌─────────────────────────────────────────────────────┐
│           Lambda Aggregator (Optional)              │
│              (Real-time Analytics)                  │
└─────────────────────────────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────────────────────┐
│                 CloudWatch Logs                     │
│            Metrics & Monitoring                     │
└─────────────────────────────────────────────────────┘

Complete Implementation

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

// Configuration
const config = new pulumi.Config();
const pipelineName = "data-pipeline";
const environment = pulumi.getStack();

// Tags for all resources
const tags = {
    Environment: environment,
    Pipeline: pipelineName,
    ManagedBy: "pulumi",
};

// ============================================================================
// 1. S3 BUCKETS FOR DATA STORAGE
// ============================================================================

// Input bucket for raw data
const inputBucket = new aws.s3.BucketV2("input-bucket", {
    bucket: `${pipelineName}-input-${aws.getCallerIdentityOutput().accountId}`,
    tags: { ...tags, Stage: "input" },
});

// Enable versioning for input bucket
new aws.s3.BucketVersioningV2("input-versioning", {
    bucket: inputBucket.id,
    versioningConfiguration: {
        status: "Enabled",
    },
});

// Enable encryption for input bucket
new aws.s3.BucketServerSideEncryptionConfigurationV2("input-encryption", {
    bucket: inputBucket.id,
    rules: [{
        applyServerSideEncryptionByDefault: {
            sseAlgorithm: "AES256",
        },
        bucketKeyEnabled: true,
    }],
});

// Processed data bucket
const processedBucket = new aws.s3.BucketV2("processed-bucket", {
    bucket: `${pipelineName}-processed-${aws.getCallerIdentityOutput().accountId}`,
    tags: { ...tags, Stage: "processed" },
});

// Enable versioning for processed bucket
new aws.s3.BucketVersioningV2("processed-versioning", {
    bucket: processedBucket.id,
    versioningConfiguration: {
        status: "Enabled",
    },
});

// Enable encryption for processed bucket
new aws.s3.BucketServerSideEncryptionConfigurationV2("processed-encryption", {
    bucket: processedBucket.id,
    rules: [{
        applyServerSideEncryptionByDefault: {
            sseAlgorithm: "AES256",
        },
        bucketKeyEnabled: true,
    }],
});

// Lifecycle policy for processed bucket
new aws.s3.BucketLifecycleConfigurationV2("processed-lifecycle", {
    bucket: processedBucket.id,
    rules: [
        {
            id: "transition-to-ia",
            status: "Enabled",
            transitions: [{
                days: 30,
                storageClass: "STANDARD_IA",
            }],
        },
        {
            id: "transition-to-glacier",
            status: "Enabled",
            transitions: [{
                days: 90,
                storageClass: "GLACIER",
            }],
        },
    ],
});

// Failed data bucket for error handling
const failedBucket = new aws.s3.BucketV2("failed-bucket", {
    bucket: `${pipelineName}-failed-${aws.getCallerIdentityOutput().accountId}`,
    tags: { ...tags, Stage: "failed" },
});

// Enable encryption for failed bucket
new aws.s3.BucketServerSideEncryptionConfigurationV2("failed-encryption", {
    bucket: failedBucket.id,
    rules: [{
        applyServerSideEncryptionByDefault: {
            sseAlgorithm: "AES256",
        },
        bucketKeyEnabled: true,
    }],
});

// ============================================================================
// 2. SNS TOPIC FOR EVENT NOTIFICATIONS
// ============================================================================

// Create SNS topic for S3 events
const snsTopic = new aws.sns.Topic("pipeline-events", {
    name: `${pipelineName}-events`,
    displayName: "Data Pipeline Events",
    tags,
});

// SNS topic policy to allow S3 to publish
const snsTopicPolicy = new aws.sns.TopicPolicy("topic-policy", {
    arn: snsTopic.arn,
    policy: pulumi.all([snsTopic.arn, inputBucket.arn]).apply(([topicArn, bucketArn]) =>
        JSON.stringify({
            Version: "2012-10-17",
            Statement: [{
                Effect: "Allow",
                Principal: {
                    Service: "s3.amazonaws.com",
                },
                Action: "SNS:Publish",
                Resource: topicArn,
                Condition: {
                    ArnLike: {
                        "aws:SourceArn": bucketArn,
                    },
                },
            }],
        })
    ),
});

// ============================================================================
// 3. SQS QUEUES FOR MESSAGE PROCESSING
// ============================================================================

// Dead Letter Queue for failed messages
const dlq = new aws.sqs.Queue("dlq", {
    name: `${pipelineName}-dlq`,
    messageRetentionSeconds: 1209600, // 14 days
    tags: { ...tags, Purpose: "deadletter" },
});

// Main processing queue
const processingQueue = new aws.sqs.Queue("processing-queue", {
    name: `${pipelineName}-processing`,
    visibilityTimeoutSeconds: 300, // 5 minutes (6x Lambda timeout)
    messageRetentionSeconds: 345600, // 4 days
    receiveWaitTimeSeconds: 20, // Long polling
    redrivePolicy: pulumi.interpolate`{
        "deadLetterTargetArn": "${dlq.arn}",
        "maxReceiveCount": 3
    }`,
    tags,
});

// SQS queue policy to allow SNS to send messages
const queuePolicy = new aws.sqs.QueuePolicy("queue-policy", {
    queueUrl: processingQueue.url,
    policy: pulumi.all([processingQueue.arn, snsTopic.arn]).apply(([queueArn, topicArn]) =>
        JSON.stringify({
            Version: "2012-10-17",
            Statement: [{
                Effect: "Allow",
                Principal: {
                    Service: "sns.amazonaws.com",
                },
                Action: "sqs:SendMessage",
                Resource: queueArn,
                Condition: {
                    ArnEquals: {
                        "aws:SourceArn": topicArn,
                    },
                },
            }],
        })
    ),
});

// Subscribe SQS to SNS topic
const snsSubscription = new aws.sns.TopicSubscription("queue-subscription", {
    topic: snsTopic.arn,
    protocol: "sqs",
    endpoint: processingQueue.arn,
    rawMessageDelivery: false,
});

// ============================================================================
// 4. S3 EVENT NOTIFICATION TO SNS
// ============================================================================

// Configure S3 bucket notification to SNS
const bucketNotification = new aws.s3.BucketNotification("input-notification", {
    bucket: inputBucket.id,
    topics: [{
        topicArn: snsTopic.arn,
        events: ["s3:ObjectCreated:*"],
        filterPrefix: "incoming/",
        filterSuffix: ".json",
    }],
}, { dependsOn: [snsTopicPolicy] });

// ============================================================================
// 5. DYNAMODB TABLE FOR PROCESSED DATA
// ============================================================================

// Create DynamoDB table to store processed records
const dynamoTable = new aws.dynamodb.Table("processed-data", {
    name: `${pipelineName}-data`,
    billingMode: "PAY_PER_REQUEST", // On-demand pricing
    hashKey: "id",
    rangeKey: "timestamp",
    attributes: [
        { name: "id", type: "S" },
        { name: "timestamp", type: "N" },
        { name: "status", type: "S" },
        { name: "processedDate", type: "S" },
    ],
    streamEnabled: true,
    streamViewType: "NEW_AND_OLD_IMAGES",
    globalSecondaryIndexes: [
        {
            name: "StatusIndex",
            hashKey: "status",
            rangeKey: "processedDate",
            projectionType: "ALL",
        },
    ],
    ttl: {
        attributeName: "expirationTime",
        enabled: true,
    },
    pointInTimeRecovery: {
        enabled: true,
    },
    tags,
});

// ============================================================================
// 6. KMS KEY FOR ENCRYPTION
// ============================================================================

// Create KMS key for encrypting sensitive data
const kmsKey = new aws.kms.Key("pipeline-key", {
    description: `Encryption key for ${pipelineName}`,
    deletionWindowInDays: 10,
    enableKeyRotation: true,
    tags,
});

// Create alias for the key
new aws.kms.Alias("pipeline-key-alias", {
    name: `alias/${pipelineName}`,
    targetKeyId: kmsKey.id,
});

// ============================================================================
// 7. IAM ROLES AND POLICIES FOR LAMBDA
// ============================================================================

// IAM role for Lambda processor
const processorRole = new aws.iam.Role("processor-role", {
    assumeRolePolicy: JSON.stringify({
        Version: "2012-10-17",
        Statement: [{
            Action: "sts:AssumeRole",
            Effect: "Allow",
            Principal: {
                Service: "lambda.amazonaws.com",
            },
        }],
    }),
    tags,
});

// Attach basic Lambda execution policy
new aws.iam.RolePolicyAttachment("processor-basic-execution", {
    role: processorRole.name,
    policyArn: "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
});

// Custom policy for processor Lambda
const processorPolicy = new aws.iam.Policy("processor-policy", {
    description: "Policy for data processor Lambda",
    policy: pulumi.all([
        inputBucket.arn,
        processedBucket.arn,
        failedBucket.arn,
        dynamoTable.arn,
        processingQueue.arn,
        kmsKey.arn,
    ]).apply(([inputArn, processedArn, failedArn, tableArn, queueArn, keyArn]) =>
        JSON.stringify({
            Version: "2012-10-17",
            Statement: [
                {
                    Effect: "Allow",
                    Action: [
                        "s3:GetObject",
                        "s3:GetObjectVersion",
                    ],
                    Resource: `${inputArn}/*`,
                },
                {
                    Effect: "Allow",
                    Action: [
                        "s3:PutObject",
                    ],
                    Resource: [
                        `${processedArn}/*`,
                        `${failedArn}/*`,
                    ],
                },
                {
                    Effect: "Allow",
                    Action: [
                        "dynamodb:PutItem",
                        "dynamodb:UpdateItem",
                        "dynamodb:GetItem",
                        "dynamodb:Query",
                    ],
                    Resource: [
                        tableArn,
                        `${tableArn}/index/*`,
                    ],
                },
                {
                    Effect: "Allow",
                    Action: [
                        "sqs:ReceiveMessage",
                        "sqs:DeleteMessage",
                        "sqs:GetQueueAttributes",
                        "sqs:ChangeMessageVisibility",
                    ],
                    Resource: queueArn,
                },
                {
                    Effect: "Allow",
                    Action: [
                        "kms:Decrypt",
                        "kms:Encrypt",
                        "kms:GenerateDataKey",
                    ],
                    Resource: keyArn,
                },
            ],
        })
    ),
});

// Attach custom policy to role
new aws.iam.RolePolicyAttachment("processor-policy-attachment", {
    role: processorRole.name,
    policyArn: processorPolicy.arn,
});

// ============================================================================
// 8. LAMBDA FUNCTION FOR DATA PROCESSING
// ============================================================================

// Create CloudWatch Log Group for Lambda
const processorLogGroup = new aws.cloudwatch.LogGroup("processor-logs", {
    name: `/aws/lambda/${pipelineName}-processor`,
    retentionInDays: 30,
    tags,
});

// Lambda function for processing data
const processorFunction = new aws.lambda.Function("processor", {
    name: `${pipelineName}-processor`,
    runtime: aws.lambda.Runtime.Python3d11,
    handler: "index.handler",
    role: processorRole.arn,
    timeout: 60,
    memorySize: 512,
    environment: {
        variables: {
            PROCESSED_BUCKET: processedBucket.bucket,
            FAILED_BUCKET: failedBucket.bucket,
            DYNAMODB_TABLE: dynamoTable.name,
            KMS_KEY_ID: kmsKey.id,
            ENVIRONMENT: environment,
        },
    },
    code: new pulumi.asset.AssetArchive({
        "index.py": new pulumi.asset.StringAsset(`
import json
import boto3
import os
from datetime import datetime
from typing import Dict, Any

s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
kms = boto3.client('kms')

PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
FAILED_BUCKET = os.environ['FAILED_BUCKET']
TABLE_NAME = os.environ['DYNAMODB_TABLE']
table = dynamodb.Table(TABLE_NAME)

def handler(event, context):
    """
    Process incoming messages from SQS.
    Each message contains an S3 event notification.
    """
    print(f"Processing {len(event['Records'])} records")

    results = {
        'successful': 0,
        'failed': 0,
        'batchItemFailures': []
    }

    for record in event['Records']:
        try:
            # Parse SQS message body (SNS message)
            sns_message = json.loads(record['body'])
            s3_event = json.loads(sns_message['Message'])

            # Process each S3 record in the event
            for s3_record in s3_event['Records']:
                bucket = s3_record['s3']['bucket']['name']
                key = s3_record['s3']['object']['key']

                print(f"Processing s3://{bucket}/{key}")

                # Download and process the file
                response = s3.get_object(Bucket=bucket, Key=key)
                data = json.loads(response['Body'].read().decode('utf-8'))

                # Validate and transform data
                processed_data = transform_data(data)

                # Save to DynamoDB
                save_to_dynamodb(processed_data)

                # Save processed file to S3
                processed_key = f"processed/{datetime.now().strftime('%Y/%m/%d')}/{key.split('/')[-1]}"
                s3.put_object(
                    Bucket=PROCESSED_BUCKET,
                    Key=processed_key,
                    Body=json.dumps(processed_data),
                    ContentType='application/json'
                )

                results['successful'] += 1
                print(f"Successfully processed {key}")

        except Exception as e:
            print(f"Error processing record: {str(e)}")
            results['failed'] += 1

            # Add to batch item failures for retry
            results['batchItemFailures'].append({
                'itemIdentifier': record['messageId']
            })

            # Save failed record to S3
            try:
                failed_key = f"failed/{datetime.now().strftime('%Y/%m/%d')}/{record['messageId']}.json"
                s3.put_object(
                    Bucket=FAILED_BUCKET,
                    Key=failed_key,
                    Body=json.dumps({
                        'error': str(e),
                        'record': record,
                        'timestamp': datetime.now().isoformat()
                    }),
                    ContentType='application/json'
                )
            except Exception as save_error:
                print(f"Failed to save error record: {str(save_error)}")

    print(f"Results: {results['successful']} successful, {results['failed']} failed")

    # Return batch item failures for partial batch failure handling
    return {
        'batchItemFailures': results['batchItemFailures']
    }

def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Transform and validate incoming data.
    Add your business logic here.
    """
    # Example transformation
    return {
        'id': data.get('id', 'unknown'),
        'timestamp': int(datetime.now().timestamp()),
        'processedDate': datetime.now().strftime('%Y-%m-%d'),
        'status': 'processed',
        'originalData': data,
        'processedAt': datetime.now().isoformat(),
    }

def save_to_dynamodb(data: Dict[str, Any]) -> None:
    """
    Save processed data to DynamoDB.
    """
    table.put_item(Item=data)
`),
    }),
    tags,
}, { dependsOn: [processorLogGroup] });

// Configure SQS as Lambda event source
const eventSourceMapping = new aws.lambda.EventSourceMapping("processor-trigger", {
    eventSourceArn: processingQueue.arn,
    functionName: processorFunction.name,
    batchSize: 10,
    maximumBatchingWindowInSeconds: 10,
    functionResponseTypes: ["ReportBatchItemFailures"], // Enable partial batch failure
    scalingConfig: {
        maximumConcurrency: 10,
    },
});

// ============================================================================
// 9. LAMBDA FUNCTION FOR AGGREGATION (Optional)
// ============================================================================

// IAM role for aggregator Lambda
const aggregatorRole = new aws.iam.Role("aggregator-role", {
    assumeRolePolicy: JSON.stringify({
        Version: "2012-10-17",
        Statement: [{
            Action: "sts:AssumeRole",
            Effect: "Allow",
            Principal: {
                Service: "lambda.amazonaws.com",
            },
        }],
    }),
    tags,
});

// Attach basic Lambda execution policy
new aws.iam.RolePolicyAttachment("aggregator-basic-execution", {
    role: aggregatorRole.name,
    policyArn: "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
});

// Policy for DynamoDB stream access
const aggregatorPolicy = new aws.iam.Policy("aggregator-policy", {
    description: "Policy for aggregator Lambda",
    policy: pulumi.all([dynamoTable.streamArn]).apply(([streamArn]) =>
        JSON.stringify({
            Version: "2012-10-17",
            Statement: [
                {
                    Effect: "Allow",
                    Action: [
                        "dynamodb:GetRecords",
                        "dynamodb:GetShardIterator",
                        "dynamodb:DescribeStream",
                        "dynamodb:ListStreams",
                    ],
                    Resource: streamArn,
                },
            ],
        })
    ),
});

// Attach policy to role
new aws.iam.RolePolicyAttachment("aggregator-policy-attachment", {
    role: aggregatorRole.name,
    policyArn: aggregatorPolicy.arn,
});

// Create CloudWatch Log Group for aggregator Lambda
const aggregatorLogGroup = new aws.cloudwatch.LogGroup("aggregator-logs", {
    name: `/aws/lambda/${pipelineName}-aggregator`,
    retentionInDays: 30,
    tags,
});

// Aggregator Lambda function
const aggregatorFunction = new aws.lambda.Function("aggregator", {
    name: `${pipelineName}-aggregator`,
    runtime: aws.lambda.Runtime.Python3d11,
    handler: "index.handler",
    role: aggregatorRole.arn,
    timeout: 60,
    memorySize: 256,
    environment: {
        variables: {
            ENVIRONMENT: environment,
        },
    },
    code: new pulumi.asset.AssetArchive({
        "index.py": new pulumi.asset.StringAsset(`
import json
from typing import Dict, Any

def handler(event, context):
    """
    Process DynamoDB stream events for real-time analytics.
    """
    print(f"Processing {len(event['Records'])} stream records")

    for record in event['Records']:
        if record['eventName'] in ['INSERT', 'MODIFY']:
            # Get new image
            new_image = record['dynamodb'].get('NewImage', {})

            # Add your aggregation logic here
            process_record(new_image)

    return {
        'statusCode': 200,
        'body': json.dumps('Successfully processed stream records')
    }

def process_record(image: Dict[str, Any]) -> None:
    """
    Process individual DynamoDB stream record.
    Add your analytics/aggregation logic here.
    """
    print(f"Processing record: {image}")
    # Example: Update counters, calculate metrics, etc.
`),
    }),
    tags,
}, { dependsOn: [aggregatorLogGroup] });

// Configure DynamoDB stream as Lambda event source
const streamEventSourceMapping = new aws.lambda.EventSourceMapping("aggregator-trigger", {
    eventSourceArn: dynamoTable.streamArn,
    functionName: aggregatorFunction.name,
    startingPosition: "LATEST",
    batchSize: 100,
    maximumBatchingWindowInSeconds: 10,
    parallelizationFactor: 2,
    maximumRecordAgeInSeconds: 3600,
    bisectBatchOnFunctionError: true,
    maximumRetryAttempts: 2,
});

// ============================================================================
// 10. CLOUDWATCH MONITORING AND ALARMS
// ============================================================================

// Alarm for DLQ messages
new aws.cloudwatch.MetricAlarm("dlq-messages-alarm", {
    name: `${pipelineName}-dlq-messages`,
    comparisonOperator: "GreaterThanThreshold",
    evaluationPeriods: 1,
    metricName: "ApproximateNumberOfMessagesVisible",
    namespace: "AWS/SQS",
    period: 300,
    statistic: "Average",
    threshold: 0,
    dimensions: {
        QueueName: dlq.name,
    },
    alarmDescription: "Alert when messages appear in DLQ",
    tags,
});

// Alarm for Lambda errors
new aws.cloudwatch.MetricAlarm("lambda-errors-alarm", {
    name: `${pipelineName}-lambda-errors`,
    comparisonOperator: "GreaterThanThreshold",
    evaluationPeriods: 2,
    metricName: "Errors",
    namespace: "AWS/Lambda",
    period: 300,
    statistic: "Sum",
    threshold: 5,
    dimensions: {
        FunctionName: processorFunction.name,
    },
    alarmDescription: "Alert when Lambda has too many errors",
    tags,
});

// Alarm for Lambda throttles
new aws.cloudwatch.MetricAlarm("lambda-throttles-alarm", {
    name: `${pipelineName}-lambda-throttles`,
    comparisonOperator: "GreaterThanThreshold",
    evaluationPeriods: 1,
    metricName: "Throttles",
    namespace: "AWS/Lambda",
    period: 300,
    statistic: "Sum",
    threshold: 0,
    dimensions: {
        FunctionName: processorFunction.name,
    },
    alarmDescription: "Alert when Lambda is throttled",
    tags,
});

// Alarm for queue age
new aws.cloudwatch.MetricAlarm("queue-age-alarm", {
    name: `${pipelineName}-queue-age`,
    comparisonOperator: "GreaterThanThreshold",
    evaluationPeriods: 2,
    metricName: "ApproximateAgeOfOldestMessage",
    namespace: "AWS/SQS",
    period: 300,
    statistic: "Maximum",
    threshold: 300, // 5 minutes
    dimensions: {
        QueueName: processingQueue.name,
    },
    alarmDescription: "Alert when messages are not being processed quickly",
    tags,
});

// ============================================================================
// 11. CLOUDWATCH DASHBOARD
// ============================================================================

// Create CloudWatch dashboard
const dashboard = new aws.cloudwatch.Dashboard("pipeline-dashboard", {
    dashboardName: `${pipelineName}-dashboard`,
    dashboardBody: pulumi.all([
        processingQueue.name,
        dlq.name,
        processorFunction.name,
        dynamoTable.name,
    ]).apply(([queueName, dlqName, functionName, tableName]) =>
        JSON.stringify({
            widgets: [
                {
                    type: "metric",
                    properties: {
                        metrics: [
                            ["AWS/SQS", "NumberOfMessagesSent", { stat: "Sum", label: "Messages Sent" }],
                            [".", "NumberOfMessagesReceived", { stat: "Sum", label: "Messages Received" }],
                            [".", "ApproximateNumberOfMessagesVisible", { stat: "Average", label: "Messages in Queue" }],
                        ],
                        period: 300,
                        stat: "Average",
                        region: aws.getRegionOutput().name,
                        title: "SQS Queue Metrics",
                        yAxis: { left: { min: 0 } },
                    },
                },
                {
                    type: "metric",
                    properties: {
                        metrics: [
                            ["AWS/Lambda", "Invocations", { stat: "Sum", label: "Invocations" }],
                            [".", "Errors", { stat: "Sum", label: "Errors" }],
                            [".", "Throttles", { stat: "Sum", label: "Throttles" }],
                            [".", "Duration", { stat: "Average", label: "Avg Duration" }],
                        ],
                        period: 300,
                        stat: "Average",
                        region: aws.getRegionOutput().name,
                        title: "Lambda Function Metrics",
                        yAxis: { left: { min: 0 } },
                    },
                },
                {
                    type: "metric",
                    properties: {
                        metrics: [
                            ["AWS/DynamoDB", "ConsumedReadCapacityUnits", { stat: "Sum" }],
                            [".", "ConsumedWriteCapacityUnits", { stat: "Sum" }],
                        ],
                        period: 300,
                        stat: "Sum",
                        region: aws.getRegionOutput().name,
                        title: "DynamoDB Capacity",
                    },
                },
            ],
        })
    ),
});

// ============================================================================
// OUTPUTS
// ============================================================================

export const inputBucketName = inputBucket.bucket;
export const processedBucketName = processedBucket.bucket;
export const failedBucketName = failedBucket.bucket;
export const queueUrl = processingQueue.url;
export const dlqUrl = dlq.url;
export const tableName = dynamoTable.name;
export const processorFunctionName = processorFunction.name;
export const kmsKeyId = kmsKey.id;
export const dashboardUrl = pulumi.interpolate`https://console.aws.amazon.com/cloudwatch/home?region=${aws.getRegionOutput().name}#dashboards:name=${dashboard.dashboardName}`;

Key Features

Event-Driven Architecture

  • S3 triggers processing pipeline automatically
  • SNS for fan-out to multiple subscribers
  • SQS for buffering and throttling
  • Lambda for serverless compute

Reliability

  • Dead Letter Queue for failed messages
  • Partial batch failure handling
  • Message visibility timeout
  • Automatic retries with backoff

Scalability

  • Serverless components auto-scale
  • SQS handles bursts in traffic
  • Lambda concurrent execution limits
  • DynamoDB on-demand capacity

Data Management

  • S3 lifecycle policies for cost optimization
  • Versioning enabled on all buckets
  • DynamoDB TTL for automatic cleanup
  • Point-in-time recovery for DynamoDB

Security

  • Encryption at rest for all data stores
  • KMS key for sensitive data
  • IAM roles with least privilege
  • Private Lambda functions (no VPC needed for this use case)

Monitoring

  • CloudWatch alarms for key metrics
  • Comprehensive dashboard
  • Structured logging
  • DLQ monitoring

Usage Example

Upload a file to trigger the pipeline:

# Create a test file
echo '{"id": "test-123", "data": "sample"}' > test.json

# Upload to S3 input bucket
aws s3 cp test.json s3://data-pipeline-input-123456789012/incoming/test.json

# Monitor CloudWatch logs
aws logs tail /aws/lambda/data-pipeline-processor --follow

# Check processed data in DynamoDB
aws dynamodb scan --table-name data-pipeline-data

# Check processed files in S3
aws s3 ls s3://data-pipeline-processed-123456789012/processed/ --recursive

Testing the Pipeline

# test_data_generator.py
import json
import boto3
import random
from datetime import datetime

s3 = boto3.client('s3')
bucket = 'data-pipeline-input-123456789012'

# Generate test data
for i in range(10):
    data = {
        'id': f'test-{i}',
        'timestamp': datetime.now().isoformat(),
        'value': random.randint(1, 100),
        'type': random.choice(['A', 'B', 'C']),
    }

    key = f'incoming/test-{i}-{datetime.now().timestamp()}.json'
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=json.dumps(data),
        ContentType='application/json'
    )
    print(f"Uploaded {key}")

Cost Optimization

  • Use S3 Intelligent-Tiering for processed data
  • Set appropriate DynamoDB TTL values
  • Configure Lambda reserved concurrency
  • Use SQS long polling to reduce API calls
  • Enable S3 lifecycle policies

Monitoring and Alerting

View the CloudWatch dashboard to monitor:

  • Message throughput
  • Processing latency
  • Error rates
  • Lambda performance
  • DynamoDB capacity

Configure SNS topics for alarm notifications:

const alarmTopic = new aws.sns.Topic("alarms", {
    name: `${pipelineName}-alarms`,
});

new aws.sns.TopicSubscription("alarm-email", {
    topic: alarmTopic.arn,
    protocol: "email",
    endpoint: "ops@example.com",
});

// Add alarmActions to CloudWatch alarms
// alarmActions: [alarmTopic.arn]

Deployment

# Install dependencies
npm install

# Deploy pipeline
pulumi up

# Upload test data
aws s3 cp test-data/ s3://$(pulumi stack output inputBucketName)/incoming/ --recursive

# View logs
pulumi logs --follow

Advanced Configurations

Add AWS Glue for Complex Transformations

const glueJob = new aws.glue.Job("etl-job", {
    name: `${pipelineName}-etl`,
    roleArn: glueRole.arn,
    command: {
        name: "glueetl",
        scriptLocation: `s3://${scriptBucket.bucket}/scripts/transform.py`,
        pythonVersion: "3",
    },
    defaultArguments: {
        "--job-language": "python",
        "--TempDir": `s3://${tempBucket.bucket}/temp/`,
    },
});

Add Step Functions for Workflow Orchestration

const stateMachine = new aws.sfn.StateMachine("pipeline-workflow", {
    roleArn: stepFunctionsRole.arn,
    definition: JSON.stringify({
        StartAt: "ProcessData",
        States: {
            ProcessData: {
                Type: "Task",
                Resource: processorFunction.arn,
                Next: "ValidateResults",
            },
            ValidateResults: {
                Type: "Task",
                Resource: validatorFunction.arn,
                End: true,
            },
        },
    }),
});

Cleanup

# Empty S3 buckets first
aws s3 rm s3://$(pulumi stack output inputBucketName) --recursive
aws s3 rm s3://$(pulumi stack output processedBucketName) --recursive
aws s3 rm s3://$(pulumi stack output failedBucketName) --recursive

# Destroy infrastructure
pulumi destroy

# Remove stack
pulumi stack rm dev

Next Steps

  • Add data validation with JSON Schema
  • Implement custom metrics
  • Add AWS X-Ray tracing
  • Create CI/CD pipeline
  • Add data quality checks
  • Implement data catalog with AWS Glue