docs
reference
services
tessl install tessl/maven-com-pulumi--aws@7.16.0Pulumi Java SDK for AWS providing strongly-typed Infrastructure-as-Code for 227 AWS service packages including compute, storage, databases, networking, security, analytics, machine learning, and more.
This guide demonstrates how to build a production-ready, event-driven data processing pipeline on AWS using serverless technologies. The architecture handles data ingestion, transformation, storage, and monitoring.
┌─────────────┐
│ Source │
│ Systems │
└──────┬──────┘
│
│ PUT Object
▼
┌─────────────────────────────────────────────────────┐
│ S3 Input Bucket │
│ (Raw Data / Landing Zone) │
└──────┬──────────────────────────────────────────────┘
│
│ S3 Event Notification
▼
┌─────────────────────────────────────────────────────┐
│ SNS Topic │
│ (Event Fan-Out) │
└──────┬──────────────────┬──────────────────────────┘
│ │
│ │ Subscribe
│ ▼
│ ┌──────────────────┐
│ │ SQS Queue DLQ │
│ └──────────────────┘
│ ▲
│ Subscribe │ Failed Messages
▼ │
┌─────────────────────────┴───────────────────────────┐
│ SQS Queue │
│ (Buffering & Throttling) │
└──────┬──────────────────────────────────────────────┘
│
│ Event Source Mapping
▼
┌─────────────────────────────────────────────────────┐
│ Lambda Processor │
│ (Transform & Validate Data) │
└──────┬────────────────────┬─────────────────────────┘
│ │
│ Success │ Failed Records
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ S3 │ │ S3 Failed │
│ Processed │ │ Bucket │
└──────┬───────┘ └──────────────────┘
│
│ Store Results
▼
┌─────────────────────────────────────────────────────┐
│ DynamoDB Table │
│ (Processed Records) │
└─────────────────────────────────────────────────────┘
│
│ DynamoDB Stream
▼
┌─────────────────────────────────────────────────────┐
│ Lambda Aggregator (Optional) │
│ (Real-time Analytics) │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ CloudWatch Logs │
│ Metrics & Monitoring │
└─────────────────────────────────────────────────────┘import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
// Configuration
const config = new pulumi.Config();
const pipelineName = "data-pipeline";
const environment = pulumi.getStack();
// Tags for all resources
const tags = {
Environment: environment,
Pipeline: pipelineName,
ManagedBy: "pulumi",
};
// ============================================================================
// 1. S3 BUCKETS FOR DATA STORAGE
// ============================================================================
// Input bucket for raw data
const inputBucket = new aws.s3.BucketV2("input-bucket", {
bucket: `${pipelineName}-input-${aws.getCallerIdentityOutput().accountId}`,
tags: { ...tags, Stage: "input" },
});
// Enable versioning for input bucket
new aws.s3.BucketVersioningV2("input-versioning", {
bucket: inputBucket.id,
versioningConfiguration: {
status: "Enabled",
},
});
// Enable encryption for input bucket
new aws.s3.BucketServerSideEncryptionConfigurationV2("input-encryption", {
bucket: inputBucket.id,
rules: [{
applyServerSideEncryptionByDefault: {
sseAlgorithm: "AES256",
},
bucketKeyEnabled: true,
}],
});
// Processed data bucket
const processedBucket = new aws.s3.BucketV2("processed-bucket", {
bucket: `${pipelineName}-processed-${aws.getCallerIdentityOutput().accountId}`,
tags: { ...tags, Stage: "processed" },
});
// Enable versioning for processed bucket
new aws.s3.BucketVersioningV2("processed-versioning", {
bucket: processedBucket.id,
versioningConfiguration: {
status: "Enabled",
},
});
// Enable encryption for processed bucket
new aws.s3.BucketServerSideEncryptionConfigurationV2("processed-encryption", {
bucket: processedBucket.id,
rules: [{
applyServerSideEncryptionByDefault: {
sseAlgorithm: "AES256",
},
bucketKeyEnabled: true,
}],
});
// Lifecycle policy for processed bucket
new aws.s3.BucketLifecycleConfigurationV2("processed-lifecycle", {
bucket: processedBucket.id,
rules: [
{
id: "transition-to-ia",
status: "Enabled",
transitions: [{
days: 30,
storageClass: "STANDARD_IA",
}],
},
{
id: "transition-to-glacier",
status: "Enabled",
transitions: [{
days: 90,
storageClass: "GLACIER",
}],
},
],
});
// Failed data bucket for error handling
const failedBucket = new aws.s3.BucketV2("failed-bucket", {
bucket: `${pipelineName}-failed-${aws.getCallerIdentityOutput().accountId}`,
tags: { ...tags, Stage: "failed" },
});
// Enable encryption for failed bucket
new aws.s3.BucketServerSideEncryptionConfigurationV2("failed-encryption", {
bucket: failedBucket.id,
rules: [{
applyServerSideEncryptionByDefault: {
sseAlgorithm: "AES256",
},
bucketKeyEnabled: true,
}],
});
// ============================================================================
// 2. SNS TOPIC FOR EVENT NOTIFICATIONS
// ============================================================================
// Create SNS topic for S3 events
const snsTopic = new aws.sns.Topic("pipeline-events", {
name: `${pipelineName}-events`,
displayName: "Data Pipeline Events",
tags,
});
// SNS topic policy to allow S3 to publish
const snsTopicPolicy = new aws.sns.TopicPolicy("topic-policy", {
arn: snsTopic.arn,
policy: pulumi.all([snsTopic.arn, inputBucket.arn]).apply(([topicArn, bucketArn]) =>
JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Principal: {
Service: "s3.amazonaws.com",
},
Action: "SNS:Publish",
Resource: topicArn,
Condition: {
ArnLike: {
"aws:SourceArn": bucketArn,
},
},
}],
})
),
});
// ============================================================================
// 3. SQS QUEUES FOR MESSAGE PROCESSING
// ============================================================================
// Dead Letter Queue for failed messages
const dlq = new aws.sqs.Queue("dlq", {
name: `${pipelineName}-dlq`,
messageRetentionSeconds: 1209600, // 14 days
tags: { ...tags, Purpose: "deadletter" },
});
// Main processing queue
const processingQueue = new aws.sqs.Queue("processing-queue", {
name: `${pipelineName}-processing`,
visibilityTimeoutSeconds: 300, // 5 minutes (6x Lambda timeout)
messageRetentionSeconds: 345600, // 4 days
receiveWaitTimeSeconds: 20, // Long polling
redrivePolicy: pulumi.interpolate`{
"deadLetterTargetArn": "${dlq.arn}",
"maxReceiveCount": 3
}`,
tags,
});
// SQS queue policy to allow SNS to send messages
const queuePolicy = new aws.sqs.QueuePolicy("queue-policy", {
queueUrl: processingQueue.url,
policy: pulumi.all([processingQueue.arn, snsTopic.arn]).apply(([queueArn, topicArn]) =>
JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Principal: {
Service: "sns.amazonaws.com",
},
Action: "sqs:SendMessage",
Resource: queueArn,
Condition: {
ArnEquals: {
"aws:SourceArn": topicArn,
},
},
}],
})
),
});
// Subscribe SQS to SNS topic
const snsSubscription = new aws.sns.TopicSubscription("queue-subscription", {
topic: snsTopic.arn,
protocol: "sqs",
endpoint: processingQueue.arn,
rawMessageDelivery: false,
});
// ============================================================================
// 4. S3 EVENT NOTIFICATION TO SNS
// ============================================================================
// Configure S3 bucket notification to SNS
const bucketNotification = new aws.s3.BucketNotification("input-notification", {
bucket: inputBucket.id,
topics: [{
topicArn: snsTopic.arn,
events: ["s3:ObjectCreated:*"],
filterPrefix: "incoming/",
filterSuffix: ".json",
}],
}, { dependsOn: [snsTopicPolicy] });
// ============================================================================
// 5. DYNAMODB TABLE FOR PROCESSED DATA
// ============================================================================
// Create DynamoDB table to store processed records
const dynamoTable = new aws.dynamodb.Table("processed-data", {
name: `${pipelineName}-data`,
billingMode: "PAY_PER_REQUEST", // On-demand pricing
hashKey: "id",
rangeKey: "timestamp",
attributes: [
{ name: "id", type: "S" },
{ name: "timestamp", type: "N" },
{ name: "status", type: "S" },
{ name: "processedDate", type: "S" },
],
streamEnabled: true,
streamViewType: "NEW_AND_OLD_IMAGES",
globalSecondaryIndexes: [
{
name: "StatusIndex",
hashKey: "status",
rangeKey: "processedDate",
projectionType: "ALL",
},
],
ttl: {
attributeName: "expirationTime",
enabled: true,
},
pointInTimeRecovery: {
enabled: true,
},
tags,
});
// ============================================================================
// 6. KMS KEY FOR ENCRYPTION
// ============================================================================
// Create KMS key for encrypting sensitive data
const kmsKey = new aws.kms.Key("pipeline-key", {
description: `Encryption key for ${pipelineName}`,
deletionWindowInDays: 10,
enableKeyRotation: true,
tags,
});
// Create alias for the key
new aws.kms.Alias("pipeline-key-alias", {
name: `alias/${pipelineName}`,
targetKeyId: kmsKey.id,
});
// ============================================================================
// 7. IAM ROLES AND POLICIES FOR LAMBDA
// ============================================================================
// IAM role for Lambda processor
const processorRole = new aws.iam.Role("processor-role", {
assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: [{
Action: "sts:AssumeRole",
Effect: "Allow",
Principal: {
Service: "lambda.amazonaws.com",
},
}],
}),
tags,
});
// Attach basic Lambda execution policy
new aws.iam.RolePolicyAttachment("processor-basic-execution", {
role: processorRole.name,
policyArn: "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
});
// Custom policy for processor Lambda
const processorPolicy = new aws.iam.Policy("processor-policy", {
description: "Policy for data processor Lambda",
policy: pulumi.all([
inputBucket.arn,
processedBucket.arn,
failedBucket.arn,
dynamoTable.arn,
processingQueue.arn,
kmsKey.arn,
]).apply(([inputArn, processedArn, failedArn, tableArn, queueArn, keyArn]) =>
JSON.stringify({
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Action: [
"s3:GetObject",
"s3:GetObjectVersion",
],
Resource: `${inputArn}/*`,
},
{
Effect: "Allow",
Action: [
"s3:PutObject",
],
Resource: [
`${processedArn}/*`,
`${failedArn}/*`,
],
},
{
Effect: "Allow",
Action: [
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:GetItem",
"dynamodb:Query",
],
Resource: [
tableArn,
`${tableArn}/index/*`,
],
},
{
Effect: "Allow",
Action: [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility",
],
Resource: queueArn,
},
{
Effect: "Allow",
Action: [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
],
Resource: keyArn,
},
],
})
),
});
// Attach custom policy to role
new aws.iam.RolePolicyAttachment("processor-policy-attachment", {
role: processorRole.name,
policyArn: processorPolicy.arn,
});
// ============================================================================
// 8. LAMBDA FUNCTION FOR DATA PROCESSING
// ============================================================================
// Create CloudWatch Log Group for Lambda
const processorLogGroup = new aws.cloudwatch.LogGroup("processor-logs", {
name: `/aws/lambda/${pipelineName}-processor`,
retentionInDays: 30,
tags,
});
// Lambda function for processing data
const processorFunction = new aws.lambda.Function("processor", {
name: `${pipelineName}-processor`,
runtime: aws.lambda.Runtime.Python3d11,
handler: "index.handler",
role: processorRole.arn,
timeout: 60,
memorySize: 512,
environment: {
variables: {
PROCESSED_BUCKET: processedBucket.bucket,
FAILED_BUCKET: failedBucket.bucket,
DYNAMODB_TABLE: dynamoTable.name,
KMS_KEY_ID: kmsKey.id,
ENVIRONMENT: environment,
},
},
code: new pulumi.asset.AssetArchive({
"index.py": new pulumi.asset.StringAsset(`
import json
import boto3
import os
from datetime import datetime
from typing import Dict, Any
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
kms = boto3.client('kms')
PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
FAILED_BUCKET = os.environ['FAILED_BUCKET']
TABLE_NAME = os.environ['DYNAMODB_TABLE']
table = dynamodb.Table(TABLE_NAME)
def handler(event, context):
"""
Process incoming messages from SQS.
Each message contains an S3 event notification.
"""
print(f"Processing {len(event['Records'])} records")
results = {
'successful': 0,
'failed': 0,
'batchItemFailures': []
}
for record in event['Records']:
try:
# Parse SQS message body (SNS message)
sns_message = json.loads(record['body'])
s3_event = json.loads(sns_message['Message'])
# Process each S3 record in the event
for s3_record in s3_event['Records']:
bucket = s3_record['s3']['bucket']['name']
key = s3_record['s3']['object']['key']
print(f"Processing s3://{bucket}/{key}")
# Download and process the file
response = s3.get_object(Bucket=bucket, Key=key)
data = json.loads(response['Body'].read().decode('utf-8'))
# Validate and transform data
processed_data = transform_data(data)
# Save to DynamoDB
save_to_dynamodb(processed_data)
# Save processed file to S3
processed_key = f"processed/{datetime.now().strftime('%Y/%m/%d')}/{key.split('/')[-1]}"
s3.put_object(
Bucket=PROCESSED_BUCKET,
Key=processed_key,
Body=json.dumps(processed_data),
ContentType='application/json'
)
results['successful'] += 1
print(f"Successfully processed {key}")
except Exception as e:
print(f"Error processing record: {str(e)}")
results['failed'] += 1
# Add to batch item failures for retry
results['batchItemFailures'].append({
'itemIdentifier': record['messageId']
})
# Save failed record to S3
try:
failed_key = f"failed/{datetime.now().strftime('%Y/%m/%d')}/{record['messageId']}.json"
s3.put_object(
Bucket=FAILED_BUCKET,
Key=failed_key,
Body=json.dumps({
'error': str(e),
'record': record,
'timestamp': datetime.now().isoformat()
}),
ContentType='application/json'
)
except Exception as save_error:
print(f"Failed to save error record: {str(save_error)}")
print(f"Results: {results['successful']} successful, {results['failed']} failed")
# Return batch item failures for partial batch failure handling
return {
'batchItemFailures': results['batchItemFailures']
}
def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform and validate incoming data.
Add your business logic here.
"""
# Example transformation
return {
'id': data.get('id', 'unknown'),
'timestamp': int(datetime.now().timestamp()),
'processedDate': datetime.now().strftime('%Y-%m-%d'),
'status': 'processed',
'originalData': data,
'processedAt': datetime.now().isoformat(),
}
def save_to_dynamodb(data: Dict[str, Any]) -> None:
"""
Save processed data to DynamoDB.
"""
table.put_item(Item=data)
`),
}),
tags,
}, { dependsOn: [processorLogGroup] });
// Configure SQS as Lambda event source
const eventSourceMapping = new aws.lambda.EventSourceMapping("processor-trigger", {
eventSourceArn: processingQueue.arn,
functionName: processorFunction.name,
batchSize: 10,
maximumBatchingWindowInSeconds: 10,
functionResponseTypes: ["ReportBatchItemFailures"], // Enable partial batch failure
scalingConfig: {
maximumConcurrency: 10,
},
});
// ============================================================================
// 9. LAMBDA FUNCTION FOR AGGREGATION (Optional)
// ============================================================================
// IAM role for aggregator Lambda
const aggregatorRole = new aws.iam.Role("aggregator-role", {
assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: [{
Action: "sts:AssumeRole",
Effect: "Allow",
Principal: {
Service: "lambda.amazonaws.com",
},
}],
}),
tags,
});
// Attach basic Lambda execution policy
new aws.iam.RolePolicyAttachment("aggregator-basic-execution", {
role: aggregatorRole.name,
policyArn: "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
});
// Policy for DynamoDB stream access
const aggregatorPolicy = new aws.iam.Policy("aggregator-policy", {
description: "Policy for aggregator Lambda",
policy: pulumi.all([dynamoTable.streamArn]).apply(([streamArn]) =>
JSON.stringify({
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Action: [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams",
],
Resource: streamArn,
},
],
})
),
});
// Attach policy to role
new aws.iam.RolePolicyAttachment("aggregator-policy-attachment", {
role: aggregatorRole.name,
policyArn: aggregatorPolicy.arn,
});
// Create CloudWatch Log Group for aggregator Lambda
const aggregatorLogGroup = new aws.cloudwatch.LogGroup("aggregator-logs", {
name: `/aws/lambda/${pipelineName}-aggregator`,
retentionInDays: 30,
tags,
});
// Aggregator Lambda function
const aggregatorFunction = new aws.lambda.Function("aggregator", {
name: `${pipelineName}-aggregator`,
runtime: aws.lambda.Runtime.Python3d11,
handler: "index.handler",
role: aggregatorRole.arn,
timeout: 60,
memorySize: 256,
environment: {
variables: {
ENVIRONMENT: environment,
},
},
code: new pulumi.asset.AssetArchive({
"index.py": new pulumi.asset.StringAsset(`
import json
from typing import Dict, Any
def handler(event, context):
"""
Process DynamoDB stream events for real-time analytics.
"""
print(f"Processing {len(event['Records'])} stream records")
for record in event['Records']:
if record['eventName'] in ['INSERT', 'MODIFY']:
# Get new image
new_image = record['dynamodb'].get('NewImage', {})
# Add your aggregation logic here
process_record(new_image)
return {
'statusCode': 200,
'body': json.dumps('Successfully processed stream records')
}
def process_record(image: Dict[str, Any]) -> None:
"""
Process individual DynamoDB stream record.
Add your analytics/aggregation logic here.
"""
print(f"Processing record: {image}")
# Example: Update counters, calculate metrics, etc.
`),
}),
tags,
}, { dependsOn: [aggregatorLogGroup] });
// Configure DynamoDB stream as Lambda event source
const streamEventSourceMapping = new aws.lambda.EventSourceMapping("aggregator-trigger", {
eventSourceArn: dynamoTable.streamArn,
functionName: aggregatorFunction.name,
startingPosition: "LATEST",
batchSize: 100,
maximumBatchingWindowInSeconds: 10,
parallelizationFactor: 2,
maximumRecordAgeInSeconds: 3600,
bisectBatchOnFunctionError: true,
maximumRetryAttempts: 2,
});
// ============================================================================
// 10. CLOUDWATCH MONITORING AND ALARMS
// ============================================================================
// Alarm for DLQ messages
new aws.cloudwatch.MetricAlarm("dlq-messages-alarm", {
name: `${pipelineName}-dlq-messages`,
comparisonOperator: "GreaterThanThreshold",
evaluationPeriods: 1,
metricName: "ApproximateNumberOfMessagesVisible",
namespace: "AWS/SQS",
period: 300,
statistic: "Average",
threshold: 0,
dimensions: {
QueueName: dlq.name,
},
alarmDescription: "Alert when messages appear in DLQ",
tags,
});
// Alarm for Lambda errors
new aws.cloudwatch.MetricAlarm("lambda-errors-alarm", {
name: `${pipelineName}-lambda-errors`,
comparisonOperator: "GreaterThanThreshold",
evaluationPeriods: 2,
metricName: "Errors",
namespace: "AWS/Lambda",
period: 300,
statistic: "Sum",
threshold: 5,
dimensions: {
FunctionName: processorFunction.name,
},
alarmDescription: "Alert when Lambda has too many errors",
tags,
});
// Alarm for Lambda throttles
new aws.cloudwatch.MetricAlarm("lambda-throttles-alarm", {
name: `${pipelineName}-lambda-throttles`,
comparisonOperator: "GreaterThanThreshold",
evaluationPeriods: 1,
metricName: "Throttles",
namespace: "AWS/Lambda",
period: 300,
statistic: "Sum",
threshold: 0,
dimensions: {
FunctionName: processorFunction.name,
},
alarmDescription: "Alert when Lambda is throttled",
tags,
});
// Alarm for queue age
new aws.cloudwatch.MetricAlarm("queue-age-alarm", {
name: `${pipelineName}-queue-age`,
comparisonOperator: "GreaterThanThreshold",
evaluationPeriods: 2,
metricName: "ApproximateAgeOfOldestMessage",
namespace: "AWS/SQS",
period: 300,
statistic: "Maximum",
threshold: 300, // 5 minutes
dimensions: {
QueueName: processingQueue.name,
},
alarmDescription: "Alert when messages are not being processed quickly",
tags,
});
// ============================================================================
// 11. CLOUDWATCH DASHBOARD
// ============================================================================
// Create CloudWatch dashboard
const dashboard = new aws.cloudwatch.Dashboard("pipeline-dashboard", {
dashboardName: `${pipelineName}-dashboard`,
dashboardBody: pulumi.all([
processingQueue.name,
dlq.name,
processorFunction.name,
dynamoTable.name,
]).apply(([queueName, dlqName, functionName, tableName]) =>
JSON.stringify({
widgets: [
{
type: "metric",
properties: {
metrics: [
["AWS/SQS", "NumberOfMessagesSent", { stat: "Sum", label: "Messages Sent" }],
[".", "NumberOfMessagesReceived", { stat: "Sum", label: "Messages Received" }],
[".", "ApproximateNumberOfMessagesVisible", { stat: "Average", label: "Messages in Queue" }],
],
period: 300,
stat: "Average",
region: aws.getRegionOutput().name,
title: "SQS Queue Metrics",
yAxis: { left: { min: 0 } },
},
},
{
type: "metric",
properties: {
metrics: [
["AWS/Lambda", "Invocations", { stat: "Sum", label: "Invocations" }],
[".", "Errors", { stat: "Sum", label: "Errors" }],
[".", "Throttles", { stat: "Sum", label: "Throttles" }],
[".", "Duration", { stat: "Average", label: "Avg Duration" }],
],
period: 300,
stat: "Average",
region: aws.getRegionOutput().name,
title: "Lambda Function Metrics",
yAxis: { left: { min: 0 } },
},
},
{
type: "metric",
properties: {
metrics: [
["AWS/DynamoDB", "ConsumedReadCapacityUnits", { stat: "Sum" }],
[".", "ConsumedWriteCapacityUnits", { stat: "Sum" }],
],
period: 300,
stat: "Sum",
region: aws.getRegionOutput().name,
title: "DynamoDB Capacity",
},
},
],
})
),
});
// ============================================================================
// OUTPUTS
// ============================================================================
export const inputBucketName = inputBucket.bucket;
export const processedBucketName = processedBucket.bucket;
export const failedBucketName = failedBucket.bucket;
export const queueUrl = processingQueue.url;
export const dlqUrl = dlq.url;
export const tableName = dynamoTable.name;
export const processorFunctionName = processorFunction.name;
export const kmsKeyId = kmsKey.id;
export const dashboardUrl = pulumi.interpolate`https://console.aws.amazon.com/cloudwatch/home?region=${aws.getRegionOutput().name}#dashboards:name=${dashboard.dashboardName}`;Upload a file to trigger the pipeline:
# Create a test file
echo '{"id": "test-123", "data": "sample"}' > test.json
# Upload to S3 input bucket
aws s3 cp test.json s3://data-pipeline-input-123456789012/incoming/test.json
# Monitor CloudWatch logs
aws logs tail /aws/lambda/data-pipeline-processor --follow
# Check processed data in DynamoDB
aws dynamodb scan --table-name data-pipeline-data
# Check processed files in S3
aws s3 ls s3://data-pipeline-processed-123456789012/processed/ --recursive# test_data_generator.py
import json
import boto3
import random
from datetime import datetime
s3 = boto3.client('s3')
bucket = 'data-pipeline-input-123456789012'
# Generate test data
for i in range(10):
data = {
'id': f'test-{i}',
'timestamp': datetime.now().isoformat(),
'value': random.randint(1, 100),
'type': random.choice(['A', 'B', 'C']),
}
key = f'incoming/test-{i}-{datetime.now().timestamp()}.json'
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(data),
ContentType='application/json'
)
print(f"Uploaded {key}")View the CloudWatch dashboard to monitor:
Configure SNS topics for alarm notifications:
const alarmTopic = new aws.sns.Topic("alarms", {
name: `${pipelineName}-alarms`,
});
new aws.sns.TopicSubscription("alarm-email", {
topic: alarmTopic.arn,
protocol: "email",
endpoint: "ops@example.com",
});
// Add alarmActions to CloudWatch alarms
// alarmActions: [alarmTopic.arn]# Install dependencies
npm install
# Deploy pipeline
pulumi up
# Upload test data
aws s3 cp test-data/ s3://$(pulumi stack output inputBucketName)/incoming/ --recursive
# View logs
pulumi logs --followconst glueJob = new aws.glue.Job("etl-job", {
name: `${pipelineName}-etl`,
roleArn: glueRole.arn,
command: {
name: "glueetl",
scriptLocation: `s3://${scriptBucket.bucket}/scripts/transform.py`,
pythonVersion: "3",
},
defaultArguments: {
"--job-language": "python",
"--TempDir": `s3://${tempBucket.bucket}/temp/`,
},
});const stateMachine = new aws.sfn.StateMachine("pipeline-workflow", {
roleArn: stepFunctionsRole.arn,
definition: JSON.stringify({
StartAt: "ProcessData",
States: {
ProcessData: {
Type: "Task",
Resource: processorFunction.arn,
Next: "ValidateResults",
},
ValidateResults: {
Type: "Task",
Resource: validatorFunction.arn,
End: true,
},
},
}),
});# Empty S3 buckets first
aws s3 rm s3://$(pulumi stack output inputBucketName) --recursive
aws s3 rm s3://$(pulumi stack output processedBucketName) --recursive
aws s3 rm s3://$(pulumi stack output failedBucketName) --recursive
# Destroy infrastructure
pulumi destroy
# Remove stack
pulumi stack rm dev