A Pulumi package for creating and managing Amazon Web Services (AWS) cloud resources with infrastructure-as-code.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Guide to AWS services for messaging, monitoring, analytics, ML, DevOps, and more.
Connect distributed applications - Messaging, queuing, orchestration
Track application health and performance
Process and analyze large datasets
Build and deploy ML models
CI/CD and development workflow
Move data and applications to AWS
Monitor and optimize AWS spending
Organize and manage resources
Start: What do you need?
┌─ Communication between services
│ ├─ Pub/sub notifications → SNS
│ ├─ Message queuing → SQS
│ ├─ Event-driven architecture → EventBridge
│ ├─ Workflow orchestration → Step Functions
│ └─ Real-time messaging → SNS + SQS
│
┌─ Monitoring & observability
│ ├─ Application logs → CloudWatch Logs
│ ├─ Metrics & alarms → CloudWatch Metrics
│ ├─ Dashboards → CloudWatch Dashboards
│ ├─ API audit trail → CloudTrail
│ ├─ Distributed tracing → X-Ray
│ └─ Centralized logging → CloudWatch + S3
│
┌─ Data analytics
│ ├─ SQL on S3 → Athena
│ ├─ Big data processing → EMR (Hadoop/Spark)
│ ├─ Real-time streaming → Kinesis
│ ├─ ETL workflows → Glue
│ ├─ Search & analytics → OpenSearch
│ └─ BI dashboards → QuickSight
│
┌─ Machine learning
│ ├─ Custom ML models → SageMaker
│ ├─ Generative AI → Bedrock
│ ├─ Text analysis → Comprehend
│ ├─ Image/video analysis → Rekognition
│ ├─ Speech services → Transcribe, Polly
│ └─ Pre-built AI APIs → Various AI services
│
└─ DevOps & automation
├─ CI/CD pipeline → CodePipeline
├─ Build → CodeBuild
├─ Deploy → CodeDeploy
├─ Git hosting → CodeCommit
└─ Operations → Systems ManagerSNS features:
Common patterns:
Queue types:
Standard Queue:
FIFO Queue:
SQS features:
EventBridge features:
Common use cases:
Workflow types:
Standard Workflows:
Express Workflows:
Step Functions features:
CloudWatch components:
Logs:
Metrics:
Dashboards:
Alarms:
Athena features:
Cost optimization:
EMR features:
Use Athena instead when:
Kinesis services:
Kinesis Data Streams:
Kinesis Firehose:
Kinesis Data Analytics:
SageMaker features:
Available models:
S3 + SNS + SQS + Lambda
import * as aws from "@pulumi/aws";
// S3 bucket for uploads
const bucket = new aws.s3.BucketV2("uploads", {
bucket: "file-uploads",
});
// SNS topic for notifications
const topic = new aws.sns.Topic("upload-notifications", {
displayName: "File Upload Notifications",
});
// S3 bucket notification to SNS
const bucketNotification = new aws.s3.BucketNotification("notification", {
bucket: bucket.id,
topics: [{
topicArn: topic.arn,
events: ["s3:ObjectCreated:*"],
}],
});
// SQS queue for processing (subscriber 1)
const processQueue = new aws.sqs.Queue("process-queue", {
visibilityTimeoutSeconds: 300,
messageRetentionSeconds: 86400, // 1 day
deadLetterQueue: {
targetArn: dlq.arn,
maxReceiveCount: 3,
},
});
// Dead letter queue
const dlq = new aws.sqs.Queue("dlq", {
messageRetentionSeconds: 1209600, // 14 days
});
// Subscribe SQS to SNS
const queueSubscription = new aws.sns.TopicSubscription("queue-sub", {
topic: topic.arn,
protocol: "sqs",
endpoint: processQueue.arn,
});
// SQS policy to allow SNS to send messages
const queuePolicy = new aws.sqs.QueuePolicy("queue-policy", {
queueUrl: processQueue.url,
policy: pulumi.all([processQueue.arn, topic.arn]).apply(([queueArn, topicArn]) =>
JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Principal: { Service: "sns.amazonaws.com" },
Action: "sqs:SendMessage",
Resource: queueArn,
Condition: {
ArnEquals: { "aws:SourceArn": topicArn },
},
}],
})
),
});
// Lambda processor (subscriber 2)
const processor = new aws.lambda.Function("processor", {
runtime: "python3.11",
handler: "lambda_function.handler",
role: lambdaRole.arn,
code: new pulumi.asset.FileArchive("./processor"),
environment: {
variables: { BUCKET_NAME: bucket.bucket },
},
});
// Subscribe Lambda to SQS
const eventSourceMapping = new aws.lambda.EventSourceMapping("sqs-trigger", {
eventSourceArn: processQueue.arn,
functionName: processor.arn,
batchSize: 10,
maximumBatchingWindowInSeconds: 5,
});
export const topicArn = topic.arn;
export const queueUrl = processQueue.url;Use when: Decoupled event processing, multiple consumers
Step Functions + Lambda
// Lambda functions for workflow steps
const validateInput = new aws.lambda.Function("validate", {
runtime: "nodejs20.x",
handler: "validate.handler",
role: lambdaRole.arn,
code: new pulumi.asset.FileArchive("./functions/validate"),
});
const processData = new aws.lambda.Function("process", {
runtime: "nodejs20.x",
handler: "process.handler",
role: lambdaRole.arn,
code: new pulumi.asset.FileArchive("./functions/process"),
});
const storeResults = new aws.lambda.Function("store", {
runtime: "nodejs20.x",
handler: "store.handler",
role: lambdaRole.arn,
code: new pulumi.asset.FileArchive("./functions/store"),
});
const sendNotification = new aws.lambda.Function("notify", {
runtime: "nodejs20.x",
handler: "notify.handler",
role: lambdaRole.arn,
code: new pulumi.asset.FileArchive("./functions/notify"),
});
// IAM role for Step Functions
const sfnRole = new aws.iam.Role("sfn-role", {
assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Principal: { Service: "states.amazonaws.com" },
Action: "sts:AssumeRole",
}],
}),
});
const sfnPolicy = new aws.iam.RolePolicy("sfn-policy", {
role: sfnRole.name,
policy: JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Action: "lambda:InvokeFunction",
Resource: [
validateInput.arn,
processData.arn,
storeResults.arn,
sendNotification.arn,
],
}],
}),
});
// Step Functions state machine
const stateMachine = new aws.sfn.StateMachine("workflow", {
roleArn: sfnRole.arn,
definition: JSON.stringify({
Comment: "Data processing workflow",
StartAt: "ValidateInput",
States: {
ValidateInput: {
Type: "Task",
Resource: validateInput.arn,
Catch: [{
ErrorEquals: ["ValidationError"],
Next: "ValidationFailed",
}],
Next: "ProcessData",
},
ProcessData: {
Type: "Task",
Resource: processData.arn,
Retry: [{
ErrorEquals: ["States.TaskFailed"],
IntervalSeconds: 2,
MaxAttempts: 3,
BackoffRate: 2,
}],
Next: "StoreResults",
},
StoreResults: {
Type: "Task",
Resource: storeResults.arn,
Next: "SendNotification",
},
SendNotification: {
Type: "Task",
Resource: sendNotification.arn,
Next: "Success",
},
ValidationFailed: {
Type: "Fail",
Error: "ValidationError",
Cause: "Input validation failed",
},
Success: {
Type: "Succeed",
},
},
}),
tags: { Name: "data-processing-workflow" },
});
export const stateMachineArn = stateMachine.arn;Use when: Multi-step workflows, orchestration, error handling
CloudWatch Logs + S3 Archive
// Log group for application logs
const logGroup = new aws.cloudwatch.LogGroup("app-logs", {
name: "/aws/application/myapp",
retentionInDays: 7, // Keep in CloudWatch for 7 days
kmsKeyId: kmsKey.id,
tags: { Application: "myapp" },
});
// S3 bucket for long-term storage
const logArchiveBucket = new aws.s3.BucketV2("log-archive", {
bucket: "log-archive-" + accountId,
});
// Lifecycle policy for cost optimization
const lifecycle = new aws.s3.BucketLifecycleConfigurationV2("lifecycle", {
bucket: logArchiveBucket.id,
rules: [{
id: "archive-old-logs",
status: "Enabled",
transitions: [
{ days: 30, storageClass: "STANDARD_IA" },
{ days: 90, storageClass: "GLACIER_INSTANT_RETRIEVAL" },
{ days: 365, storageClass: "DEEP_ARCHIVE" },
],
}],
});
// IAM role for CloudWatch Logs to write to S3
const logsRole = new aws.iam.Role("logs-to-s3-role", {
assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Principal: { Service: "logs.amazonaws.com" },
Action: "sts:AssumeRole",
}],
}),
});
const logsPolicy = new aws.iam.RolePolicy("logs-s3-policy", {
role: logsRole.name,
policy: logArchiveBucket.arn.apply(arn => JSON.stringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Action: ["s3:PutObject"],
Resource: `${arn}/*`,
}],
})),
});
// Export logs to S3 (requires AWS CLI or Lambda)
// aws logs create-export-task \
// --log-group-name /aws/application/myapp \
// --from $(date -d '1 day ago' +%s)000 \
// --to $(date +%s)000 \
// --destination log-archive-bucket \
// --destination-prefix logs/
// Metric filter for error tracking
const errorMetric = new aws.cloudwatch.LogMetricFilter("error-metric", {
logGroupName: logGroup.name,
name: "ErrorCount",
pattern: "[level=ERROR, ...]",
metricTransformation: {
name: "ErrorCount",
namespace: "MyApp",
value: "1",
defaultValue: "0",
},
});
// Alarm on error metric
const errorAlarm = new aws.cloudwatch.MetricAlarm("error-alarm", {
name: "HighErrorRate",
comparisonOperator: "GreaterThanThreshold",
evaluationPeriods: 2,
metricName: "ErrorCount",
namespace: "MyApp",
period: 300,
statistic: "Sum",
threshold: 10,
alarmDescription: "Triggers when error count exceeds 10 in 5 minutes",
alarmActions: [snsTopicArn],
tags: { Severity: "high" },
});
export const logGroupName = logGroup.name;Use when: Application monitoring, audit trails, troubleshooting
Kinesis + Lambda + OpenSearch
// Kinesis Data Stream
const stream = new aws.kinesis.Stream("events", {
name: "event-stream",
shardCount: 2,
retentionPeriod: 24, // hours
streamModeDetails: { streamMode: "PROVISIONED" },
encryptionType: "KMS",
kmsKeyId: kmsKey.id,
tags: { Purpose: "event-streaming" },
});
// Lambda processor
const processor = new aws.lambda.Function("stream-processor", {
runtime: "python3.11",
handler: "lambda_function.handler",
role: lambdaRole.arn,
code: new pulumi.asset.FileArchive("./processor"),
environment: {
variables: {
OPENSEARCH_ENDPOINT: opensearchDomain.endpoint,
},
},
timeout: 60,
});
// Event source mapping
const eventMapping = new aws.lambda.EventSourceMapping("kinesis-trigger", {
eventSourceArn: stream.arn,
functionName: processor.arn,
startingPosition: "LATEST",
batchSize: 100,
maximumBatchingWindowInSeconds: 5,
parallelizationFactor: 10,
maximumRecordAgeInSeconds: 3600,
bisectBatchOnFunctionError: true,
maximumRetryAttempts: 3,
});
// OpenSearch domain
const opensearchDomain = new aws.opensearch.Domain("analytics", {
domainName: "event-analytics",
engineVersion: "OpenSearch_2.11",
clusterConfig: {
instanceType: "t3.small.search",
instanceCount: 2,
zoneAwarenessEnabled: true,
},
ebsOptions: {
ebsEnabled: true,
volumeSize: 10,
volumeType: "gp3",
},
encryptAtRest: { enabled: true, kmsKeyId: kmsKey.id },
nodeToNodeEncryption: { enabled: true },
domainEndpointOptions: {
enforceHttps: true,
tlsSecurityPolicy: "Policy-Min-TLS-1-2-2019-07",
},
});
// CloudWatch dashboard
const dashboard = new aws.cloudwatch.Dashboard("analytics-dashboard", {
dashboardName: "event-analytics",
dashboardBody: JSON.stringify({
widgets: [
{
type: "metric",
properties: {
metrics: [
["AWS/Kinesis", "IncomingRecords", { stat: "Sum", label: "Incoming Records" }],
[".", "IncomingBytes", { stat: "Sum", label: "Incoming Bytes" }],
],
period: 300,
stat: "Average",
region: "us-east-1",
title: "Kinesis Stream Metrics",
},
},
{
type: "metric",
properties: {
metrics: [
["AWS/Lambda", "Invocations", { stat: "Sum" }],
[".", "Errors", { stat: "Sum" }],
[".", "Duration", { stat: "Average" }],
],
period: 300,
stat: "Average",
region: "us-east-1",
title: "Lambda Processor Metrics",
},
},
],
}),
});
export const streamName = stream.name;
export const opensearchEndpoint = opensearchDomain.endpoint;Use when: Real-time analytics, log analytics, event processing
CodePipeline + CodeBuild + CodeDeploy
// S3 bucket for artifacts
const artifactBucket = new aws.s3.BucketV2("artifacts", {
bucket: "pipeline-artifacts-" + accountId,
});
// CodeBuild project
const buildProject = new aws.codebuild.Project("build", {
name: "app-build",
serviceRole: buildRole.arn,
artifacts: { type: "CODEPIPELINE" },
environment: {
computeType: "BUILD_GENERAL1_SMALL",
image: "aws/codebuild/standard:7.0",
type: "LINUX_CONTAINER",
environmentVariables: [{
name: "AWS_DEFAULT_REGION",
value: region,
}],
},
source: {
type: "CODEPIPELINE",
buildspec: `version: 0.2
phases:
install:
runtime-versions:
nodejs: 20
pre_build:
commands:
- npm install
build:
commands:
- npm run build
- npm test
artifacts:
files:
- '**/*'`,
},
logsConfig: {
cloudwatchLogs: {
status: "ENABLED",
groupName: "/aws/codebuild/app-build",
},
},
});
// CodeDeploy application
const app = new aws.codedeploy.Application("app", {
name: "my-app",
computePlatform: "Server", // or ECS, Lambda
});
const deploymentGroup = new aws.codedeploy.DeploymentGroup("deployment-group", {
appName: app.name,
deploymentGroupName: "production",
serviceRoleArn: deployRole.arn,
deploymentConfigName: "CodeDeployDefault.OneAtATime",
ec2TagSets: [{
ec2TagFilters: [{
key: "Environment",
type: "KEY_AND_VALUE",
value: "production",
}],
}],
autoRollbackConfiguration: {
enabled: true,
events: ["DEPLOYMENT_FAILURE"],
},
});
// CodePipeline
const pipeline = new aws.codepipeline.Pipeline("pipeline", {
name: "app-pipeline",
roleArn: pipelineRole.arn,
artifactStores: [{
location: artifactBucket.bucket,
type: "S3",
}],
stages: [
{
name: "Source",
actions: [{
name: "Source",
category: "Source",
owner: "AWS",
provider: "CodeCommit",
version: "1",
outputArtifacts: ["SourceOutput"],
configuration: {
RepositoryName: "my-app",
BranchName: "main",
},
}],
},
{
name: "Build",
actions: [{
name: "Build",
category: "Build",
owner: "AWS",
provider: "CodeBuild",
version: "1",
inputArtifacts: ["SourceOutput"],
outputArtifacts: ["BuildOutput"],
configuration: {
ProjectName: buildProject.name,
},
}],
},
{
name: "Deploy",
actions: [{
name: "Deploy",
category: "Deploy",
owner: "AWS",
provider: "CodeDeploy",
version: "1",
inputArtifacts: ["BuildOutput"],
configuration: {
ApplicationName: app.name,
DeploymentGroupName: deploymentGroup.deploymentGroupName,
},
}],
},
],
});
export const pipelineName = pipeline.name;Use when: Automated deployments, CI/CD workflows
Install with Tessl CLI
npx tessl i tessl/npm-pulumi--aws@7.16.0