0
# Orchestration Targets
1
2
Targets for workflow orchestration services that can execute complex business processes in response to EventBridge events.
3
4
## Capabilities
5
6
### Step Functions State Machine Target
7
8
Execute AWS Step Functions state machines in response to EventBridge events.
9
10
```typescript { .api }
11
/**
12
* Use a Step Functions state machine as a target for Amazon EventBridge rules
13
*/
14
class SfnStateMachine implements events.IRuleTarget {
15
readonly machine: sfn.IStateMachine;
16
17
constructor(machine: sfn.IStateMachine, props?: SfnStateMachineProps);
18
19
/**
20
* Returns a RuleTarget that can be used to trigger this Step Functions state machine
21
* as a result from an EventBridge event
22
*/
23
bind(rule: events.IRule, id?: string): events.RuleTargetConfig;
24
}
25
26
interface SfnStateMachineProps extends TargetBaseProps {
27
/**
28
* The input to pass to the state machine execution
29
* @default the entire EventBridge event
30
*/
31
readonly input?: events.RuleTargetInput;
32
33
/**
34
* The IAM role to execute the state machine
35
* @default a new role will be created
36
*/
37
readonly role?: iam.IRole;
38
}
39
```
40
41
**Usage Example:**
42
43
```typescript
44
import * as sfn from "@aws-cdk/aws-stepfunctions";
45
import * as sfnTasks from "@aws-cdk/aws-stepfunctions-tasks";
46
import * as lambda from "@aws-cdk/aws-lambda";
47
import * as events from "@aws-cdk/aws-events";
48
import * as targets from "@aws-cdk/aws-events-targets";
49
import * as sqs from "@aws-cdk/aws-sqs";
50
import * as iam from "@aws-cdk/aws-iam";
51
52
// Create Lambda functions for the workflow
53
const validateOrderFunction = new lambda.Function(this, "ValidateOrder", {
54
runtime: lambda.Runtime.NODEJS_14_X,
55
handler: "validate.handler",
56
code: lambda.Code.fromInline(`
57
exports.handler = async (event) => {
58
const { orderId, amount, customerId } = event;
59
// Validation logic
60
return {
61
orderId,
62
amount,
63
customerId,
64
isValid: amount > 0 && customerId,
65
validationTimestamp: new Date().toISOString()
66
};
67
};
68
`),
69
});
70
71
const processPaymentFunction = new lambda.Function(this, "ProcessPayment", {
72
runtime: lambda.Runtime.NODEJS_14_X,
73
handler: "payment.handler",
74
code: lambda.Code.fromInline(`
75
exports.handler = async (event) => {
76
const { orderId, amount, customerId } = event;
77
// Payment processing logic
78
return {
79
orderId,
80
paymentId: 'pay_' + Math.random().toString(36).substr(2, 9),
81
status: 'processed',
82
amount,
83
processedAt: new Date().toISOString()
84
};
85
};
86
`),
87
});
88
89
const fulfillOrderFunction = new lambda.Function(this, "FulfillOrder", {
90
runtime: lambda.Runtime.NODEJS_14_X,
91
handler: "fulfillment.handler",
92
code: lambda.Code.fromInline(`
93
exports.handler = async (event) => {
94
const { orderId, paymentId } = event;
95
return {
96
orderId,
97
paymentId,
98
fulfillmentId: 'ful_' + Math.random().toString(36).substr(2, 9),
99
status: 'fulfilled',
100
estimatedDelivery: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000).toISOString()
101
};
102
};
103
`),
104
});
105
106
// Create Step Functions tasks
107
const validateOrderTask = new sfnTasks.LambdaInvoke(this, "ValidateOrderTask", {
108
lambdaFunction: validateOrderFunction,
109
resultPath: "$.validation",
110
});
111
112
const processPaymentTask = new sfnTasks.LambdaInvoke(this, "ProcessPaymentTask", {
113
lambdaFunction: processPaymentFunction,
114
resultPath: "$.payment",
115
});
116
117
const fulfillOrderTask = new sfnTasks.LambdaInvoke(this, "FulfillOrderTask", {
118
lambdaFunction: fulfillOrderFunction,
119
resultPath: "$.fulfillment",
120
});
121
122
// Create failure and success states
123
const orderFailure = new sfn.Fail(this, "OrderFailure", {
124
comment: "Order processing failed",
125
error: "OrderProcessingError",
126
cause: "Order validation failed or payment processing failed",
127
});
128
129
const orderSuccess = new sfn.Succeed(this, "OrderSuccess", {
130
comment: "Order processed successfully",
131
});
132
133
// Define the workflow
134
const definition = validateOrderTask
135
.next(new sfn.Choice(this, "ValidationChoice")
136
.when(sfn.Condition.booleanEquals("$.validation.Payload.isValid", false), orderFailure)
137
.otherwise(processPaymentTask
138
.next(new sfn.Choice(this, "PaymentChoice")
139
.when(sfn.Condition.stringEquals("$.payment.Payload.status", "failed"), orderFailure)
140
.otherwise(fulfillOrderTask.next(orderSuccess))
141
)
142
)
143
);
144
145
// Create the state machine
146
const orderProcessingStateMachine = new sfn.StateMachine(this, "OrderProcessingStateMachine", {
147
definition,
148
stateMachineName: "order-processing-workflow",
149
timeout: Duration.minutes(30),
150
});
151
152
// Create dead letter queue for failed executions
153
const workflowDlq = new sqs.Queue(this, "WorkflowDeadLetterQueue", {
154
queueName: "workflow-execution-failures",
155
});
156
157
// Create custom execution role with additional permissions
158
const executionRole = new iam.Role(this, "StepFunctionsExecutionRole", {
159
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
160
inlinePolicies: {
161
StepFunctionsExecution: new iam.PolicyDocument({
162
statements: [
163
new iam.PolicyStatement({
164
actions: ["states:StartExecution"],
165
resources: [orderProcessingStateMachine.stateMachineArn],
166
}),
167
],
168
}),
169
},
170
});
171
172
// Rule for order created events
173
const orderCreatedRule = new events.Rule(this, "OrderCreatedRule", {
174
eventPattern: {
175
source: ["myapp.orders"],
176
detailType: ["Order Created"],
177
detail: {
178
status: ["pending"],
179
amount: [{ numeric: [">", 0] }],
180
},
181
},
182
});
183
184
// Add Step Functions target with custom input
185
orderCreatedRule.addTarget(new targets.SfnStateMachine(orderProcessingStateMachine, {
186
deadLetterQueue: workflowDlq,
187
retryAttempts: 2,
188
maxEventAge: Duration.hours(4),
189
role: executionRole,
190
input: events.RuleTargetInput.fromObject({
191
orderId: events.EventField.fromPath("$.detail.orderId"),
192
customerId: events.EventField.fromPath("$.detail.customerId"),
193
amount: events.EventField.fromPath("$.detail.amount"),
194
currency: events.EventField.fromPath("$.detail.currency"),
195
items: events.EventField.fromPath("$.detail.items"),
196
shippingAddress: events.EventField.fromPath("$.detail.shippingAddress"),
197
eventMetadata: {
198
eventId: events.EventField.fromPath("$.id"),
199
eventTime: events.EventField.fromPath("$.time"),
200
source: events.EventField.fromPath("$.source"),
201
account: events.EventField.fromPath("$.account"),
202
region: events.EventField.fromPath("$.region"),
203
},
204
}),
205
}));
206
207
// Rule for scheduled workflows (daily batch processing)
208
const dailyBatchRule = new events.Rule(this, "DailyBatchRule", {
209
description: "Trigger daily order reconciliation workflow",
210
schedule: events.Schedule.cron({
211
hour: "2",
212
minute: "0",
213
}),
214
});
215
216
// Create batch processing state machine
217
const batchProcessingTask = new sfnTasks.LambdaInvoke(this, "BatchProcessingTask", {
218
lambdaFunction: new lambda.Function(this, "BatchProcessor", {
219
runtime: lambda.Runtime.PYTHON_3_9,
220
handler: "batch.handler",
221
code: lambda.Code.fromInline(`
222
import json
223
import boto3
224
from datetime import datetime, timedelta
225
226
def handler(event, context):
227
# Batch processing logic
228
yesterday = datetime.now() - timedelta(days=1)
229
return {
230
'processedDate': yesterday.isoformat(),
231
'recordsProcessed': 1250,
232
'errors': [],
233
'status': 'completed'
234
}
235
`),
236
timeout: Duration.minutes(15),
237
}),
238
});
239
240
const batchStateMachine = new sfn.StateMachine(this, "BatchProcessingStateMachine", {
241
definition: batchProcessingTask,
242
stateMachineName: "daily-batch-processing",
243
});
244
245
dailyBatchRule.addTarget(new targets.SfnStateMachine(batchStateMachine, {
246
input: events.RuleTargetInput.fromObject({
247
batchDate: events.EventField.fromPath("$.time"),
248
batchType: "daily-reconciliation",
249
triggeredBy: "scheduled-event",
250
}),
251
}));
252
253
// Rule for complex multi-step workflows triggered by external systems
254
const integrationRule = new events.Rule(this, "IntegrationRule", {
255
eventPattern: {
256
source: ["partner.webhook"],
257
detailType: ["Data Sync Request", "Bulk Import Request"],
258
},
259
});
260
261
// Create data processing workflow
262
const dataValidationTask = new sfnTasks.LambdaInvoke(this, "DataValidationTask", {
263
lambdaFunction: new lambda.Function(this, "DataValidator", {
264
runtime: lambda.Runtime.NODEJS_14_X,
265
handler: "validator.handler",
266
code: lambda.Code.fromAsset("lambda/data-validator"),
267
}),
268
});
269
270
const dataTransformTask = new sfnTasks.LambdaInvoke(this, "DataTransformTask", {
271
lambdaFunction: new lambda.Function(this, "DataTransformer", {
272
runtime: lambda.Runtime.PYTHON_3_9,
273
handler: "transform.handler",
274
code: lambda.Code.fromAsset("lambda/data-transformer"),
275
}),
276
});
277
278
const dataLoadTask = new sfnTasks.LambdaInvoke(this, "DataLoadTask", {
279
lambdaFunction: new lambda.Function(this, "DataLoader", {
280
runtime: lambda.Runtime.NODEJS_14_X,
281
handler: "loader.handler",
282
code: lambda.Code.fromAsset("lambda/data-loader"),
283
}),
284
});
285
286
const dataProcessingWorkflow = new sfn.StateMachine(this, "DataProcessingWorkflow", {
287
definition: dataValidationTask
288
.next(dataTransformTask)
289
.next(dataLoadTask),
290
stateMachineName: "data-integration-workflow",
291
timeout: Duration.hours(2),
292
});
293
294
integrationRule.addTarget(new targets.SfnStateMachine(dataProcessingWorkflow, {
295
input: events.RuleTargetInput.fromObject({
296
requestId: events.EventField.fromPath("$.detail.requestId"),
297
dataSource: events.EventField.fromPath("$.detail.source"),
298
dataFormat: events.EventField.fromPath("$.detail.format"),
299
s3Location: events.EventField.fromPath("$.detail.s3Location"),
300
partnerInfo: events.EventField.fromPath("$.detail.partner"),
301
processingOptions: events.EventField.fromPath("$.detail.options"),
302
}),
303
}));
304
```
305
306
## Workflow Patterns
307
308
### Error Handling and Retry Logic
309
310
```typescript
311
// State machine with comprehensive error handling
312
const errorHandlingDefinition = new sfn.Parallel(this, "ProcessWithErrorHandling")
313
.branch(
314
processPaymentTask.addRetry({
315
errors: ["States.TaskFailed"],
316
intervalSeconds: 2,
317
maxAttempts: 3,
318
backoffRate: 2.0,
319
}).addCatch(orderFailure, {
320
errors: ["States.ALL"],
321
resultPath: "$.error",
322
})
323
);
324
325
const resilientStateMachine = new sfn.StateMachine(this, "ResilientStateMachine", {
326
definition: errorHandlingDefinition,
327
});
328
```
329
330
### Long-Running Workflows
331
332
```typescript
333
// Workflow with wait states and callbacks
334
const waitForApproval = new sfn.Wait(this, "WaitForApproval", {
335
time: sfn.WaitTime.duration(Duration.hours(24)),
336
});
337
338
const checkApprovalStatus = new sfnTasks.LambdaInvoke(this, "CheckApprovalStatus", {
339
lambdaFunction: approvalCheckFunction,
340
});
341
342
const approvalWorkflow = validateOrderTask
343
.next(waitForApproval)
344
.next(checkApprovalStatus)
345
.next(new sfn.Choice(this, "ApprovalChoice")
346
.when(sfn.Condition.stringEquals("$.Payload.status", "approved"), processPaymentTask)
347
.when(sfn.Condition.stringEquals("$.Payload.status", "rejected"), orderFailure)
348
.otherwise(waitForApproval) // Loop back to wait
349
);
350
```
351
352
### Map State for Parallel Processing
353
354
```typescript
355
// Process multiple items in parallel
356
const processItemsMap = new sfn.Map(this, "ProcessItemsMap", {
357
maxConcurrency: 5,
358
itemsPath: "$.items",
359
resultPath: "$.processedItems",
360
})
361
.iterator(
362
new sfnTasks.LambdaInvoke(this, "ProcessSingleItem", {
363
lambdaFunction: itemProcessorFunction,
364
})
365
);
366
367
const batchProcessingWorkflow = new sfn.StateMachine(this, "BatchProcessingWorkflow", {
368
definition: processItemsMap,
369
});
370
371
// Trigger for batch processing events
372
const batchRule = new events.Rule(this, "BatchRule", {
373
eventPattern: {
374
source: ["myapp.batch"],
375
detailType: ["Batch Processing Request"],
376
},
377
});
378
379
batchRule.addTarget(new targets.SfnStateMachine(batchProcessingWorkflow, {
380
input: events.RuleTargetInput.fromPath("$.detail"),
381
}));
382
```
383
384
### Integration with External Systems
385
386
```typescript
387
// Workflow that integrates with external APIs
388
const callExternalApiTask = new sfnTasks.HttpInvoke(this, "CallExternalAPI", {
389
apiRoot: "https://api.partner.com",
390
apiEndpoint: sfn.TaskInput.fromJsonPathAt("$.apiEndpoint"),
391
method: sfnTasks.HttpMethod.POST,
392
headers: sfn.TaskInput.fromObject({
393
"Authorization": "Bearer ${aws:secretsmanager:api-token}",
394
"Content-Type": "application/json",
395
}),
396
requestBody: sfn.TaskInput.fromJsonPathAt("$.requestPayload"),
397
});
398
399
const externalIntegrationWorkflow = new sfn.StateMachine(this, "ExternalIntegrationWorkflow", {
400
definition: callExternalApiTask,
401
});
402
403
// Trigger workflow for external integration events
404
const externalRule = new events.Rule(this, "ExternalRule", {
405
eventPattern: {
406
source: ["myapp.external"],
407
detailType: ["API Integration Request"],
408
},
409
});
410
411
externalRule.addTarget(new targets.SfnStateMachine(externalIntegrationWorkflow, {
412
input: events.RuleTargetInput.fromObject({
413
apiEndpoint: events.EventField.fromPath("$.detail.endpoint"),
414
requestPayload: events.EventField.fromPath("$.detail.payload"),
415
metadata: {
416
requestId: events.EventField.fromPath("$.detail.requestId"),
417
timestamp: events.EventField.fromPath("$.time"),
418
},
419
}),
420
}));
421
```