CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pulumi--aws

A Pulumi package for creating and managing Amazon Web Services (AWS) cloud resources with infrastructure-as-code.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sfn.mddocs/services/

Step Functions - Workflow Orchestration

AWS Step Functions coordinates multiple AWS services into serverless workflows.

Common Tasks

import { sfn } from "@pulumi/aws";

// Create a simple workflow
const stateMachine = new sfn.StateMachine("workflow", {
    roleArn: sfnRole.arn,
    definition: JSON.stringify({
        Comment: "A simple workflow",
        StartAt: "ProcessData",
        States: {
            ProcessData: {
                Type: "Task",
                Resource: dataProcessor.arn,
                Next: "NotifySuccess"
            },
            NotifySuccess: {
                Type: "Task",
                Resource: notifier.arn,
                End: true
            }
        }
    }),
});

// Create Express workflow for high-volume, short-duration tasks
const expressWorkflow = new sfn.StateMachine("express", {
    type: "EXPRESS",
    roleArn: sfnRole.arn,
    definition: definition,
    loggingConfiguration: {
        level: "ALL",
        includeExecutionData: true,
        destinations: [{
            cloudWatchLogsLogGroup: {
                logGroupArn: logGroup.arn,
            },
        }],
    },
});

Core Resources

StateMachine

State machines define workflows as a series of steps.

class StateMachine extends pulumi.CustomResource {
    constructor(name: string, args: StateMachineArgs, opts?: pulumi.CustomResourceOptions);

    readonly arn: pulumi.Output<string>;
    readonly name: pulumi.Output<string>;
}

interface StateMachineArgs {
    definition: pulumi.Input<string>;
    roleArn: pulumi.Input<string>;
    name?: pulumi.Input<string>;
    type?: pulumi.Input<"STANDARD" | "EXPRESS">;
    loggingConfiguration?: pulumi.Input<StateMachineLoggingConfiguration>;
    tags?: pulumi.Input<{[key: string]: pulumi.Input<string>}>;
}

Example - Order processing workflow

const orderWorkflow = new sfn.StateMachine("order-processing", {
    roleArn: workflowRole.arn,
    definition: JSON.stringify({
        Comment: "Order processing workflow",
        StartAt: "ValidateOrder",
        States: {
            ValidateOrder: {
                Type: "Task",
                Resource: "arn:aws:states:::lambda:invoke",
                Parameters: {
                    FunctionName: validateFunction.name,
                    "Payload.$": "$"
                },
                Next: "IsValid",
                Catch: [{
                    ErrorEquals: ["States.ALL"],
                    Next: "OrderFailed"
                }]
            },
            IsValid: {
                Type: "Choice",
                Choices: [{
                    Variable: "$.valid",
                    BooleanEquals: true,
                    Next: "ProcessPayment"
                }],
                Default: "OrderFailed"
            },
            ProcessPayment: {
                Type: "Task",
                Resource: "arn:aws:states:::lambda:invoke",
                Parameters: {
                    FunctionName: paymentFunction.name,
                    "Payload.$": "$"
                },
                Next: "UpdateInventory"
            },
            UpdateInventory: {
                Type: "Task",
                Resource: "arn:aws:states:::dynamodb:updateItem",
                Parameters: {
                    TableName: inventoryTable.name,
                    Key: {
                        productId: { "S.$": "$.productId" }
                    },
                    UpdateExpression: "SET quantity = quantity - :qty",
                    ExpressionAttributeValues: {
                        ":qty": { "N.$": "$.quantity" }
                    }
                },
                Next: "SendConfirmation"
            },
            SendConfirmation: {
                Type: "Task",
                Resource: "arn:aws:states:::sns:publish",
                Parameters: {
                    TopicArn: confirmationTopic.arn,
                    "Message.$": "$"
                },
                End: true
            },
            OrderFailed: {
                Type: "Fail",
                Cause: "Order validation or processing failed"
            }
        }
    }),
    loggingConfiguration: {
        level: "ERROR",
        includeExecutionData: false,
        destinations: [{
            cloudWatchLogsLogGroup: {
                logGroupArn: workflowLogs.arn,
            },
        }],
    },
});

Example - ETL workflow with parallel processing

const etlWorkflow = new sfn.StateMachine("etl-pipeline", {
    roleArn: etlRole.arn,
    definition: JSON.stringify({
        Comment: "ETL data pipeline",
        StartAt: "ExtractData",
        States: {
            ExtractData: {
                Type: "Task",
                Resource: extractFunction.arn,
                Next: "ParallelTransform"
            },
            ParallelTransform: {
                Type: "Parallel",
                Branches: [
                    {
                        StartAt: "TransformCustomers",
                        States: {
                            TransformCustomers: {
                                Type: "Task",
                                Resource: transformCustomersFunction.arn,
                                End: true
                            }
                        }
                    },
                    {
                        StartAt: "TransformOrders",
                        States: {
                            TransformOrders: {
                                Type: "Task",
                                Resource: transformOrdersFunction.arn,
                                End: true
                            }
                        }
                    }
                ],
                Next: "LoadData"
            },
            LoadData: {
                Type: "Task",
                Resource: loadFunction.arn,
                End: true
            }
        }
    }),
});

Example - Express workflow for high-throughput

const iotProcessor = new sfn.StateMachine("iot-processor", {
    name: "iot-event-processor",
    type: "EXPRESS",
    roleArn: expressRole.arn,
    definition: JSON.stringify({
        Comment: "Process IoT sensor data",
        StartAt: "ValidateSensorData",
        States: {
            ValidateSensorData: {
                Type: "Task",
                Resource: validateFunction.arn,
                Next: "StoreData",
                Retry: [{
                    ErrorEquals: ["States.ALL"],
                    IntervalSeconds: 1,
                    MaxAttempts: 2,
                    BackoffRate: 1.5
                }]
            },
            StoreData: {
                Type: "Task",
                Resource: "arn:aws:states:::dynamodb:putItem",
                Parameters: {
                    TableName: sensorTable.name,
                    Item: {
                        "deviceId": { "S.$": "$.deviceId" },
                        "timestamp": { "N.$": "$.timestamp" },
                        "reading": { "N.$": "$.value" }
                    }
                },
                End: true
            }
        }
    }),
    loggingConfiguration: {
        level: "ALL",
        includeExecutionData: true,
        destinations: [{
            cloudWatchLogsLogGroup: {
                logGroupArn: iotLogs.arn,
            },
        }],
    },
});

// Invoke from EventBridge
const sensorRule = new aws.cloudwatch.EventRule("sensor-events", {
    eventPattern: JSON.stringify({
        source: ["custom.iot"],
        "detail-type": ["Sensor Reading"],
    }),
});

new aws.cloudwatch.EventTarget("sfn-target", {
    rule: sensorRule.name,
    arn: iotProcessor.arn,
    roleArn: eventBridgeRole.arn,
});

Activity

Activities let external workers poll for tasks in a Step Functions workflow.

class Activity extends pulumi.CustomResource {
    constructor(name: string, args?: ActivityArgs, opts?: pulumi.CustomResourceOptions);

    readonly id: pulumi.Output<string>;
}

interface ActivityArgs {
    name?: pulumi.Input<string>;
    tags?: pulumi.Input<{[key: string]: pulumi.Input<string>}>;
}

Example - Long-running manual approval

const approvalActivity = new sfn.Activity("approval", {
    name: "manual-approval",
});

const approvalWorkflow = new sfn.StateMachine("approval-flow", {
    roleArn: workflowRole.arn,
    definition: pulumi.interpolate`{
        "Comment": "Workflow with manual approval",
        "StartAt": "ProcessRequest",
        "States": {
            "ProcessRequest": {
                "Type": "Task",
                "Resource": "${processFunction.arn}",
                "Next": "WaitForApproval"
            },
            "WaitForApproval": {
                "Type": "Task",
                "Resource": "${approvalActivity.id}",
                "TimeoutSeconds": 3600,
                "Next": "ApprovalReceived"
            },
            "ApprovalReceived": {
                "Type": "Task",
                "Resource": "${completeFunction.arn}",
                "End": true
            }
        }
    }`,
});

Alias

Version aliases for state machines.

class Alias extends pulumi.CustomResource {
    constructor(name: string, args: AliasArgs, opts?: pulumi.CustomResourceOptions);
}

interface AliasArgs {
    name: pulumi.Input<string>;
    routingConfiguration?: pulumi.Input<pulumi.Input<AliasRoutingConfiguration>[]>;
    description?: pulumi.Input<string>;
}

Additional Resources and Data Sources

  • getActivity - Look up activity by name
  • getAlias - Look up state machine alias
  • getStateMachine - Look up state machine
  • getStateMachineVersions - List state machine versions

For complete Step Functions API, see All Services.

IAM Permissions

Step Functions execution role needs permissions for invoked services:

const sfnRole = new aws.iam.Role("sfn-role", {
    assumeRolePolicy: JSON.stringify({
        Version: "2012-10-17",
        Statement: [{
            Action: "sts:AssumeRole",
            Principal: {
                Service: "states.amazonaws.com",
            },
            Effect: "Allow",
        }],
    }),
});

new aws.iam.RolePolicy("sfn-permissions", {
    role: sfnRole.id,
    policy: pulumi.all([lambdaFunction.arn, snsTopicArn]).apply(([fnArn, topicArn]) =>
        JSON.stringify({
            Version: "2012-10-17",
            Statement: [
                {
                    Effect: "Allow",
                    Action: ["lambda:InvokeFunction"],
                    Resource: fnArn,
                },
                {
                    Effect: "Allow",
                    Action: ["sns:Publish"],
                    Resource: topicArn,
                },
                {
                    Effect: "Allow",
                    Action: ["logs:CreateLogDelivery", "logs:GetLogDelivery"],
                    Resource: "*",
                },
            ],
        })
    ),
});

Related Services

  • Lambda - Task execution
  • EventBridge - Trigger workflows from events
  • SNS - Send notifications from workflows
  • SQS - Queue workflow tasks
  • DynamoDB - Store workflow state
  • ECS - Run containerized tasks

Install with Tessl CLI

npx tessl i tessl/npm-pulumi--aws

docs

index.md

quickstart.md

README.md

tile.json