Event targets for Amazon EventBridge that enable routing events to various AWS services
Targets for workflow orchestration services that can execute complex business processes in response to EventBridge events.
Execute AWS Step Functions state machines in response to EventBridge events.
/**
* Use a Step Functions state machine as a target for Amazon EventBridge rules
*/
class SfnStateMachine implements events.IRuleTarget {
readonly machine: sfn.IStateMachine;
constructor(machine: sfn.IStateMachine, props?: SfnStateMachineProps);
/**
* Returns a RuleTarget that can be used to trigger this Step Functions state machine
* as a result from an EventBridge event
*/
bind(rule: events.IRule, id?: string): events.RuleTargetConfig;
}
interface SfnStateMachineProps extends TargetBaseProps {
/**
* The input to pass to the state machine execution
* @default the entire EventBridge event
*/
readonly input?: events.RuleTargetInput;
/**
* The IAM role to execute the state machine
* @default a new role will be created
*/
readonly role?: iam.IRole;
}Usage Example:
import * as sfn from "@aws-cdk/aws-stepfunctions";
import * as sfnTasks from "@aws-cdk/aws-stepfunctions-tasks";
import * as lambda from "@aws-cdk/aws-lambda";
import * as events from "@aws-cdk/aws-events";
import * as targets from "@aws-cdk/aws-events-targets";
import * as sqs from "@aws-cdk/aws-sqs";
import * as iam from "@aws-cdk/aws-iam";
// Create Lambda functions for the workflow
const validateOrderFunction = new lambda.Function(this, "ValidateOrder", {
runtime: lambda.Runtime.NODEJS_14_X,
handler: "validate.handler",
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
const { orderId, amount, customerId } = event;
// Validation logic
return {
orderId,
amount,
customerId,
isValid: amount > 0 && customerId,
validationTimestamp: new Date().toISOString()
};
};
`),
});
const processPaymentFunction = new lambda.Function(this, "ProcessPayment", {
runtime: lambda.Runtime.NODEJS_14_X,
handler: "payment.handler",
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
const { orderId, amount, customerId } = event;
// Payment processing logic
return {
orderId,
paymentId: 'pay_' + Math.random().toString(36).substr(2, 9),
status: 'processed',
amount,
processedAt: new Date().toISOString()
};
};
`),
});
const fulfillOrderFunction = new lambda.Function(this, "FulfillOrder", {
runtime: lambda.Runtime.NODEJS_14_X,
handler: "fulfillment.handler",
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
const { orderId, paymentId } = event;
return {
orderId,
paymentId,
fulfillmentId: 'ful_' + Math.random().toString(36).substr(2, 9),
status: 'fulfilled',
estimatedDelivery: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000).toISOString()
};
};
`),
});
// Create Step Functions tasks
const validateOrderTask = new sfnTasks.LambdaInvoke(this, "ValidateOrderTask", {
lambdaFunction: validateOrderFunction,
resultPath: "$.validation",
});
const processPaymentTask = new sfnTasks.LambdaInvoke(this, "ProcessPaymentTask", {
lambdaFunction: processPaymentFunction,
resultPath: "$.payment",
});
const fulfillOrderTask = new sfnTasks.LambdaInvoke(this, "FulfillOrderTask", {
lambdaFunction: fulfillOrderFunction,
resultPath: "$.fulfillment",
});
// Create failure and success states
const orderFailure = new sfn.Fail(this, "OrderFailure", {
comment: "Order processing failed",
error: "OrderProcessingError",
cause: "Order validation failed or payment processing failed",
});
const orderSuccess = new sfn.Succeed(this, "OrderSuccess", {
comment: "Order processed successfully",
});
// Define the workflow
const definition = validateOrderTask
.next(new sfn.Choice(this, "ValidationChoice")
.when(sfn.Condition.booleanEquals("$.validation.Payload.isValid", false), orderFailure)
.otherwise(processPaymentTask
.next(new sfn.Choice(this, "PaymentChoice")
.when(sfn.Condition.stringEquals("$.payment.Payload.status", "failed"), orderFailure)
.otherwise(fulfillOrderTask.next(orderSuccess))
)
)
);
// Create the state machine
const orderProcessingStateMachine = new sfn.StateMachine(this, "OrderProcessingStateMachine", {
definition,
stateMachineName: "order-processing-workflow",
timeout: Duration.minutes(30),
});
// Create dead letter queue for failed executions
const workflowDlq = new sqs.Queue(this, "WorkflowDeadLetterQueue", {
queueName: "workflow-execution-failures",
});
// Create custom execution role with additional permissions
const executionRole = new iam.Role(this, "StepFunctionsExecutionRole", {
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
inlinePolicies: {
StepFunctionsExecution: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["states:StartExecution"],
resources: [orderProcessingStateMachine.stateMachineArn],
}),
],
}),
},
});
// Rule for order created events
const orderCreatedRule = new events.Rule(this, "OrderCreatedRule", {
eventPattern: {
source: ["myapp.orders"],
detailType: ["Order Created"],
detail: {
status: ["pending"],
amount: [{ numeric: [">", 0] }],
},
},
});
// Add Step Functions target with custom input
orderCreatedRule.addTarget(new targets.SfnStateMachine(orderProcessingStateMachine, {
deadLetterQueue: workflowDlq,
retryAttempts: 2,
maxEventAge: Duration.hours(4),
role: executionRole,
input: events.RuleTargetInput.fromObject({
orderId: events.EventField.fromPath("$.detail.orderId"),
customerId: events.EventField.fromPath("$.detail.customerId"),
amount: events.EventField.fromPath("$.detail.amount"),
currency: events.EventField.fromPath("$.detail.currency"),
items: events.EventField.fromPath("$.detail.items"),
shippingAddress: events.EventField.fromPath("$.detail.shippingAddress"),
eventMetadata: {
eventId: events.EventField.fromPath("$.id"),
eventTime: events.EventField.fromPath("$.time"),
source: events.EventField.fromPath("$.source"),
account: events.EventField.fromPath("$.account"),
region: events.EventField.fromPath("$.region"),
},
}),
}));
// Rule for scheduled workflows (daily batch processing)
const dailyBatchRule = new events.Rule(this, "DailyBatchRule", {
description: "Trigger daily order reconciliation workflow",
schedule: events.Schedule.cron({
hour: "2",
minute: "0",
}),
});
// Create batch processing state machine
const batchProcessingTask = new sfnTasks.LambdaInvoke(this, "BatchProcessingTask", {
lambdaFunction: new lambda.Function(this, "BatchProcessor", {
runtime: lambda.Runtime.PYTHON_3_9,
handler: "batch.handler",
code: lambda.Code.fromInline(`
import json
import boto3
from datetime import datetime, timedelta
def handler(event, context):
# Batch processing logic
yesterday = datetime.now() - timedelta(days=1)
return {
'processedDate': yesterday.isoformat(),
'recordsProcessed': 1250,
'errors': [],
'status': 'completed'
}
`),
timeout: Duration.minutes(15),
}),
});
const batchStateMachine = new sfn.StateMachine(this, "BatchProcessingStateMachine", {
definition: batchProcessingTask,
stateMachineName: "daily-batch-processing",
});
dailyBatchRule.addTarget(new targets.SfnStateMachine(batchStateMachine, {
input: events.RuleTargetInput.fromObject({
batchDate: events.EventField.fromPath("$.time"),
batchType: "daily-reconciliation",
triggeredBy: "scheduled-event",
}),
}));
// Rule for complex multi-step workflows triggered by external systems
const integrationRule = new events.Rule(this, "IntegrationRule", {
eventPattern: {
source: ["partner.webhook"],
detailType: ["Data Sync Request", "Bulk Import Request"],
},
});
// Create data processing workflow
const dataValidationTask = new sfnTasks.LambdaInvoke(this, "DataValidationTask", {
lambdaFunction: new lambda.Function(this, "DataValidator", {
runtime: lambda.Runtime.NODEJS_14_X,
handler: "validator.handler",
code: lambda.Code.fromAsset("lambda/data-validator"),
}),
});
const dataTransformTask = new sfnTasks.LambdaInvoke(this, "DataTransformTask", {
lambdaFunction: new lambda.Function(this, "DataTransformer", {
runtime: lambda.Runtime.PYTHON_3_9,
handler: "transform.handler",
code: lambda.Code.fromAsset("lambda/data-transformer"),
}),
});
const dataLoadTask = new sfnTasks.LambdaInvoke(this, "DataLoadTask", {
lambdaFunction: new lambda.Function(this, "DataLoader", {
runtime: lambda.Runtime.NODEJS_14_X,
handler: "loader.handler",
code: lambda.Code.fromAsset("lambda/data-loader"),
}),
});
const dataProcessingWorkflow = new sfn.StateMachine(this, "DataProcessingWorkflow", {
definition: dataValidationTask
.next(dataTransformTask)
.next(dataLoadTask),
stateMachineName: "data-integration-workflow",
timeout: Duration.hours(2),
});
integrationRule.addTarget(new targets.SfnStateMachine(dataProcessingWorkflow, {
input: events.RuleTargetInput.fromObject({
requestId: events.EventField.fromPath("$.detail.requestId"),
dataSource: events.EventField.fromPath("$.detail.source"),
dataFormat: events.EventField.fromPath("$.detail.format"),
s3Location: events.EventField.fromPath("$.detail.s3Location"),
partnerInfo: events.EventField.fromPath("$.detail.partner"),
processingOptions: events.EventField.fromPath("$.detail.options"),
}),
}));// State machine with comprehensive error handling
const errorHandlingDefinition = new sfn.Parallel(this, "ProcessWithErrorHandling")
.branch(
processPaymentTask.addRetry({
errors: ["States.TaskFailed"],
intervalSeconds: 2,
maxAttempts: 3,
backoffRate: 2.0,
}).addCatch(orderFailure, {
errors: ["States.ALL"],
resultPath: "$.error",
})
);
const resilientStateMachine = new sfn.StateMachine(this, "ResilientStateMachine", {
definition: errorHandlingDefinition,
});// Workflow with wait states and callbacks
const waitForApproval = new sfn.Wait(this, "WaitForApproval", {
time: sfn.WaitTime.duration(Duration.hours(24)),
});
const checkApprovalStatus = new sfnTasks.LambdaInvoke(this, "CheckApprovalStatus", {
lambdaFunction: approvalCheckFunction,
});
const approvalWorkflow = validateOrderTask
.next(waitForApproval)
.next(checkApprovalStatus)
.next(new sfn.Choice(this, "ApprovalChoice")
.when(sfn.Condition.stringEquals("$.Payload.status", "approved"), processPaymentTask)
.when(sfn.Condition.stringEquals("$.Payload.status", "rejected"), orderFailure)
.otherwise(waitForApproval) // Loop back to wait
);// Process multiple items in parallel
const processItemsMap = new sfn.Map(this, "ProcessItemsMap", {
maxConcurrency: 5,
itemsPath: "$.items",
resultPath: "$.processedItems",
})
.iterator(
new sfnTasks.LambdaInvoke(this, "ProcessSingleItem", {
lambdaFunction: itemProcessorFunction,
})
);
const batchProcessingWorkflow = new sfn.StateMachine(this, "BatchProcessingWorkflow", {
definition: processItemsMap,
});
// Trigger for batch processing events
const batchRule = new events.Rule(this, "BatchRule", {
eventPattern: {
source: ["myapp.batch"],
detailType: ["Batch Processing Request"],
},
});
batchRule.addTarget(new targets.SfnStateMachine(batchProcessingWorkflow, {
input: events.RuleTargetInput.fromPath("$.detail"),
}));// Workflow that integrates with external APIs
const callExternalApiTask = new sfnTasks.HttpInvoke(this, "CallExternalAPI", {
apiRoot: "https://api.partner.com",
apiEndpoint: sfn.TaskInput.fromJsonPathAt("$.apiEndpoint"),
method: sfnTasks.HttpMethod.POST,
headers: sfn.TaskInput.fromObject({
"Authorization": "Bearer ${aws:secretsmanager:api-token}",
"Content-Type": "application/json",
}),
requestBody: sfn.TaskInput.fromJsonPathAt("$.requestPayload"),
});
const externalIntegrationWorkflow = new sfn.StateMachine(this, "ExternalIntegrationWorkflow", {
definition: callExternalApiTask,
});
// Trigger workflow for external integration events
const externalRule = new events.Rule(this, "ExternalRule", {
eventPattern: {
source: ["myapp.external"],
detailType: ["API Integration Request"],
},
});
externalRule.addTarget(new targets.SfnStateMachine(externalIntegrationWorkflow, {
input: events.RuleTargetInput.fromObject({
apiEndpoint: events.EventField.fromPath("$.detail.endpoint"),
requestPayload: events.EventField.fromPath("$.detail.payload"),
metadata: {
requestId: events.EventField.fromPath("$.detail.requestId"),
timestamp: events.EventField.fromPath("$.time"),
},
}),
}));Install with Tessl CLI
npx tessl i tessl/npm-aws-cdk--aws-events-targets