or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analytics-targets.mdapi-targets.mdcicd-targets.mdcompute-targets.mdindex.mdmessaging-targets.mdorchestration-targets.mdsystem-targets.md

orchestration-targets.mddocs/

0

# Orchestration Targets

1

2

Targets for workflow orchestration services that can execute complex business processes in response to EventBridge events.

3

4

## Capabilities

5

6

### Step Functions State Machine Target

7

8

Execute AWS Step Functions state machines in response to EventBridge events.

9

10

```typescript { .api }

11

/**

12

* Use a Step Functions state machine as a target for Amazon EventBridge rules

13

*/

14

class SfnStateMachine implements events.IRuleTarget {

15

readonly machine: sfn.IStateMachine;

16

17

constructor(machine: sfn.IStateMachine, props?: SfnStateMachineProps);

18

19

/**

20

* Returns a RuleTarget that can be used to trigger this Step Functions state machine

21

* as a result from an EventBridge event

22

*/

23

bind(rule: events.IRule, id?: string): events.RuleTargetConfig;

24

}

25

26

interface SfnStateMachineProps extends TargetBaseProps {

27

/**

28

* The input to pass to the state machine execution

29

* @default the entire EventBridge event

30

*/

31

readonly input?: events.RuleTargetInput;

32

33

/**

34

* The IAM role to execute the state machine

35

* @default a new role will be created

36

*/

37

readonly role?: iam.IRole;

38

}

39

```

40

41

**Usage Example:**

42

43

```typescript

44

import * as sfn from "@aws-cdk/aws-stepfunctions";

45

import * as sfnTasks from "@aws-cdk/aws-stepfunctions-tasks";

46

import * as lambda from "@aws-cdk/aws-lambda";

47

import * as events from "@aws-cdk/aws-events";

48

import * as targets from "@aws-cdk/aws-events-targets";

49

import * as sqs from "@aws-cdk/aws-sqs";

50

import * as iam from "@aws-cdk/aws-iam";

51

52

// Create Lambda functions for the workflow

53

const validateOrderFunction = new lambda.Function(this, "ValidateOrder", {

54

runtime: lambda.Runtime.NODEJS_14_X,

55

handler: "validate.handler",

56

code: lambda.Code.fromInline(`

57

exports.handler = async (event) => {

58

const { orderId, amount, customerId } = event;

59

// Validation logic

60

return {

61

orderId,

62

amount,

63

customerId,

64

isValid: amount > 0 && customerId,

65

validationTimestamp: new Date().toISOString()

66

};

67

};

68

`),

69

});

70

71

const processPaymentFunction = new lambda.Function(this, "ProcessPayment", {

72

runtime: lambda.Runtime.NODEJS_14_X,

73

handler: "payment.handler",

74

code: lambda.Code.fromInline(`

75

exports.handler = async (event) => {

76

const { orderId, amount, customerId } = event;

77

// Payment processing logic

78

return {

79

orderId,

80

paymentId: 'pay_' + Math.random().toString(36).substr(2, 9),

81

status: 'processed',

82

amount,

83

processedAt: new Date().toISOString()

84

};

85

};

86

`),

87

});

88

89

const fulfillOrderFunction = new lambda.Function(this, "FulfillOrder", {

90

runtime: lambda.Runtime.NODEJS_14_X,

91

handler: "fulfillment.handler",

92

code: lambda.Code.fromInline(`

93

exports.handler = async (event) => {

94

const { orderId, paymentId } = event;

95

return {

96

orderId,

97

paymentId,

98

fulfillmentId: 'ful_' + Math.random().toString(36).substr(2, 9),

99

status: 'fulfilled',

100

estimatedDelivery: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000).toISOString()

101

};

102

};

103

`),

104

});

105

106

// Create Step Functions tasks

107

const validateOrderTask = new sfnTasks.LambdaInvoke(this, "ValidateOrderTask", {

108

lambdaFunction: validateOrderFunction,

109

resultPath: "$.validation",

110

});

111

112

const processPaymentTask = new sfnTasks.LambdaInvoke(this, "ProcessPaymentTask", {

113

lambdaFunction: processPaymentFunction,

114

resultPath: "$.payment",

115

});

116

117

const fulfillOrderTask = new sfnTasks.LambdaInvoke(this, "FulfillOrderTask", {

118

lambdaFunction: fulfillOrderFunction,

119

resultPath: "$.fulfillment",

120

});

121

122

// Create failure and success states

123

const orderFailure = new sfn.Fail(this, "OrderFailure", {

124

comment: "Order processing failed",

125

error: "OrderProcessingError",

126

cause: "Order validation failed or payment processing failed",

127

});

128

129

const orderSuccess = new sfn.Succeed(this, "OrderSuccess", {

130

comment: "Order processed successfully",

131

});

132

133

// Define the workflow

134

const definition = validateOrderTask

135

.next(new sfn.Choice(this, "ValidationChoice")

136

.when(sfn.Condition.booleanEquals("$.validation.Payload.isValid", false), orderFailure)

137

.otherwise(processPaymentTask

138

.next(new sfn.Choice(this, "PaymentChoice")

139

.when(sfn.Condition.stringEquals("$.payment.Payload.status", "failed"), orderFailure)

140

.otherwise(fulfillOrderTask.next(orderSuccess))

141

)

142

)

143

);

144

145

// Create the state machine

146

const orderProcessingStateMachine = new sfn.StateMachine(this, "OrderProcessingStateMachine", {

147

definition,

148

stateMachineName: "order-processing-workflow",

149

timeout: Duration.minutes(30),

150

});

151

152

// Create dead letter queue for failed executions

153

const workflowDlq = new sqs.Queue(this, "WorkflowDeadLetterQueue", {

154

queueName: "workflow-execution-failures",

155

});

156

157

// Create custom execution role with additional permissions

158

const executionRole = new iam.Role(this, "StepFunctionsExecutionRole", {

159

assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),

160

inlinePolicies: {

161

StepFunctionsExecution: new iam.PolicyDocument({

162

statements: [

163

new iam.PolicyStatement({

164

actions: ["states:StartExecution"],

165

resources: [orderProcessingStateMachine.stateMachineArn],

166

}),

167

],

168

}),

169

},

170

});

171

172

// Rule for order created events

173

const orderCreatedRule = new events.Rule(this, "OrderCreatedRule", {

174

eventPattern: {

175

source: ["myapp.orders"],

176

detailType: ["Order Created"],

177

detail: {

178

status: ["pending"],

179

amount: [{ numeric: [">", 0] }],

180

},

181

},

182

});

183

184

// Add Step Functions target with custom input

185

orderCreatedRule.addTarget(new targets.SfnStateMachine(orderProcessingStateMachine, {

186

deadLetterQueue: workflowDlq,

187

retryAttempts: 2,

188

maxEventAge: Duration.hours(4),

189

role: executionRole,

190

input: events.RuleTargetInput.fromObject({

191

orderId: events.EventField.fromPath("$.detail.orderId"),

192

customerId: events.EventField.fromPath("$.detail.customerId"),

193

amount: events.EventField.fromPath("$.detail.amount"),

194

currency: events.EventField.fromPath("$.detail.currency"),

195

items: events.EventField.fromPath("$.detail.items"),

196

shippingAddress: events.EventField.fromPath("$.detail.shippingAddress"),

197

eventMetadata: {

198

eventId: events.EventField.fromPath("$.id"),

199

eventTime: events.EventField.fromPath("$.time"),

200

source: events.EventField.fromPath("$.source"),

201

account: events.EventField.fromPath("$.account"),

202

region: events.EventField.fromPath("$.region"),

203

},

204

}),

205

}));

206

207

// Rule for scheduled workflows (daily batch processing)

208

const dailyBatchRule = new events.Rule(this, "DailyBatchRule", {

209

description: "Trigger daily order reconciliation workflow",

210

schedule: events.Schedule.cron({

211

hour: "2",

212

minute: "0",

213

}),

214

});

215

216

// Create batch processing state machine

217

const batchProcessingTask = new sfnTasks.LambdaInvoke(this, "BatchProcessingTask", {

218

lambdaFunction: new lambda.Function(this, "BatchProcessor", {

219

runtime: lambda.Runtime.PYTHON_3_9,

220

handler: "batch.handler",

221

code: lambda.Code.fromInline(`

222

import json

223

import boto3

224

from datetime import datetime, timedelta

225

226

def handler(event, context):

227

# Batch processing logic

228

yesterday = datetime.now() - timedelta(days=1)

229

return {

230

'processedDate': yesterday.isoformat(),

231

'recordsProcessed': 1250,

232

'errors': [],

233

'status': 'completed'

234

}

235

`),

236

timeout: Duration.minutes(15),

237

}),

238

});

239

240

const batchStateMachine = new sfn.StateMachine(this, "BatchProcessingStateMachine", {

241

definition: batchProcessingTask,

242

stateMachineName: "daily-batch-processing",

243

});

244

245

dailyBatchRule.addTarget(new targets.SfnStateMachine(batchStateMachine, {

246

input: events.RuleTargetInput.fromObject({

247

batchDate: events.EventField.fromPath("$.time"),

248

batchType: "daily-reconciliation",

249

triggeredBy: "scheduled-event",

250

}),

251

}));

252

253

// Rule for complex multi-step workflows triggered by external systems

254

const integrationRule = new events.Rule(this, "IntegrationRule", {

255

eventPattern: {

256

source: ["partner.webhook"],

257

detailType: ["Data Sync Request", "Bulk Import Request"],

258

},

259

});

260

261

// Create data processing workflow

262

const dataValidationTask = new sfnTasks.LambdaInvoke(this, "DataValidationTask", {

263

lambdaFunction: new lambda.Function(this, "DataValidator", {

264

runtime: lambda.Runtime.NODEJS_14_X,

265

handler: "validator.handler",

266

code: lambda.Code.fromAsset("lambda/data-validator"),

267

}),

268

});

269

270

const dataTransformTask = new sfnTasks.LambdaInvoke(this, "DataTransformTask", {

271

lambdaFunction: new lambda.Function(this, "DataTransformer", {

272

runtime: lambda.Runtime.PYTHON_3_9,

273

handler: "transform.handler",

274

code: lambda.Code.fromAsset("lambda/data-transformer"),

275

}),

276

});

277

278

const dataLoadTask = new sfnTasks.LambdaInvoke(this, "DataLoadTask", {

279

lambdaFunction: new lambda.Function(this, "DataLoader", {

280

runtime: lambda.Runtime.NODEJS_14_X,

281

handler: "loader.handler",

282

code: lambda.Code.fromAsset("lambda/data-loader"),

283

}),

284

});

285

286

const dataProcessingWorkflow = new sfn.StateMachine(this, "DataProcessingWorkflow", {

287

definition: dataValidationTask

288

.next(dataTransformTask)

289

.next(dataLoadTask),

290

stateMachineName: "data-integration-workflow",

291

timeout: Duration.hours(2),

292

});

293

294

integrationRule.addTarget(new targets.SfnStateMachine(dataProcessingWorkflow, {

295

input: events.RuleTargetInput.fromObject({

296

requestId: events.EventField.fromPath("$.detail.requestId"),

297

dataSource: events.EventField.fromPath("$.detail.source"),

298

dataFormat: events.EventField.fromPath("$.detail.format"),

299

s3Location: events.EventField.fromPath("$.detail.s3Location"),

300

partnerInfo: events.EventField.fromPath("$.detail.partner"),

301

processingOptions: events.EventField.fromPath("$.detail.options"),

302

}),

303

}));

304

```

305

306

## Workflow Patterns

307

308

### Error Handling and Retry Logic

309

310

```typescript

311

// State machine with comprehensive error handling

312

const errorHandlingDefinition = new sfn.Parallel(this, "ProcessWithErrorHandling")

313

.branch(

314

processPaymentTask.addRetry({

315

errors: ["States.TaskFailed"],

316

intervalSeconds: 2,

317

maxAttempts: 3,

318

backoffRate: 2.0,

319

}).addCatch(orderFailure, {

320

errors: ["States.ALL"],

321

resultPath: "$.error",

322

})

323

);

324

325

const resilientStateMachine = new sfn.StateMachine(this, "ResilientStateMachine", {

326

definition: errorHandlingDefinition,

327

});

328

```

329

330

### Long-Running Workflows

331

332

```typescript

333

// Workflow with wait states and callbacks

334

const waitForApproval = new sfn.Wait(this, "WaitForApproval", {

335

time: sfn.WaitTime.duration(Duration.hours(24)),

336

});

337

338

const checkApprovalStatus = new sfnTasks.LambdaInvoke(this, "CheckApprovalStatus", {

339

lambdaFunction: approvalCheckFunction,

340

});

341

342

const approvalWorkflow = validateOrderTask

343

.next(waitForApproval)

344

.next(checkApprovalStatus)

345

.next(new sfn.Choice(this, "ApprovalChoice")

346

.when(sfn.Condition.stringEquals("$.Payload.status", "approved"), processPaymentTask)

347

.when(sfn.Condition.stringEquals("$.Payload.status", "rejected"), orderFailure)

348

.otherwise(waitForApproval) // Loop back to wait

349

);

350

```

351

352

### Map State for Parallel Processing

353

354

```typescript

355

// Process multiple items in parallel

356

const processItemsMap = new sfn.Map(this, "ProcessItemsMap", {

357

maxConcurrency: 5,

358

itemsPath: "$.items",

359

resultPath: "$.processedItems",

360

})

361

.iterator(

362

new sfnTasks.LambdaInvoke(this, "ProcessSingleItem", {

363

lambdaFunction: itemProcessorFunction,

364

})

365

);

366

367

const batchProcessingWorkflow = new sfn.StateMachine(this, "BatchProcessingWorkflow", {

368

definition: processItemsMap,

369

});

370

371

// Trigger for batch processing events

372

const batchRule = new events.Rule(this, "BatchRule", {

373

eventPattern: {

374

source: ["myapp.batch"],

375

detailType: ["Batch Processing Request"],

376

},

377

});

378

379

batchRule.addTarget(new targets.SfnStateMachine(batchProcessingWorkflow, {

380

input: events.RuleTargetInput.fromPath("$.detail"),

381

}));

382

```

383

384

### Integration with External Systems

385

386

```typescript

387

// Workflow that integrates with external APIs

388

const callExternalApiTask = new sfnTasks.HttpInvoke(this, "CallExternalAPI", {

389

apiRoot: "https://api.partner.com",

390

apiEndpoint: sfn.TaskInput.fromJsonPathAt("$.apiEndpoint"),

391

method: sfnTasks.HttpMethod.POST,

392

headers: sfn.TaskInput.fromObject({

393

"Authorization": "Bearer ${aws:secretsmanager:api-token}",

394

"Content-Type": "application/json",

395

}),

396

requestBody: sfn.TaskInput.fromJsonPathAt("$.requestPayload"),

397

});

398

399

const externalIntegrationWorkflow = new sfn.StateMachine(this, "ExternalIntegrationWorkflow", {

400

definition: callExternalApiTask,

401

});

402

403

// Trigger workflow for external integration events

404

const externalRule = new events.Rule(this, "ExternalRule", {

405

eventPattern: {

406

source: ["myapp.external"],

407

detailType: ["API Integration Request"],

408

},

409

});

410

411

externalRule.addTarget(new targets.SfnStateMachine(externalIntegrationWorkflow, {

412

input: events.RuleTargetInput.fromObject({

413

apiEndpoint: events.EventField.fromPath("$.detail.endpoint"),

414

requestPayload: events.EventField.fromPath("$.detail.payload"),

415

metadata: {

416

requestId: events.EventField.fromPath("$.detail.requestId"),

417

timestamp: events.EventField.fromPath("$.time"),

418

},

419

}),

420

}));

421

```