or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth-service.mdcompute-service.mdcore-framework.mdflows-service.mdgcs-service.mdgroups-service.mdindex.mdsearch-service.mdtimers-service.mdtransfer-service.md

flows-service.mddocs/

0

# Flows Service

1

2

Workflow automation and orchestration for complex multi-step operations across Globus services with conditional logic, error handling, and state management. The Flows service enables creation and execution of sophisticated automation workflows that can coordinate data transfers, compute jobs, and other service operations.

3

4

## Capabilities

5

6

### Flows Client

7

8

Core client for managing flow definitions, executing workflows, and monitoring flow runs with comprehensive filtering and access control.

9

10

```python { .api }

11

class FlowsClient(BaseClient):

12

"""

13

Client for Globus Flows service operations.

14

15

Provides methods for flow lifecycle management including creation, execution,

16

monitoring, and administration with comprehensive access control and

17

notification capabilities.

18

"""

19

20

def __init__(

21

self,

22

*,

23

app: GlobusApp | None = None,

24

authorizer: GlobusAuthorizer | None = None,

25

environment: str | None = None,

26

base_url: str | None = None,

27

**kwargs

28

) -> None: ...

29

```

30

31

### Flow Definition and Management

32

33

Create, update, and manage workflow definitions with JSON-based state machines and comprehensive access control policies.

34

35

```python { .api }

36

def create_flow(

37

self,

38

title: str,

39

definition: dict[str, Any],

40

input_schema: dict[str, Any],

41

*,

42

subtitle: str | None = None,

43

description: str | None = None,

44

flow_viewers: list[str] | None = None,

45

flow_starters: list[str] | None = None,

46

flow_administrators: list[str] | None = None,

47

run_managers: list[str] | None = None,

48

run_monitors: list[str] | None = None,

49

keywords: list[str] | None = None,

50

subscription_id: str | UUID | None = None,

51

additional_fields: dict[str, Any] | None = None

52

) -> GlobusHTTPResponse:

53

"""

54

Create a new flow definition.

55

56

Creates a workflow with states, transitions, and execution logic

57

defined using Amazon States Language (ASL) syntax with Globus

58

service integrations.

59

60

Parameters:

61

- title: Human-readable flow name (1-128 characters)

62

- definition: JSON state machine definition (ASL format)

63

- input_schema: JSON Schema for validating flow input

64

- subtitle: Concise flow summary (0-128 characters)

65

- description: Detailed flow description (0-4096 characters)

66

- flow_viewers: Principal URNs who can view the flow

67

- flow_starters: Principal URNs who can run the flow

68

- flow_administrators: Principal URNs who can manage the flow

69

- run_managers: Principal URNs who can manage flow runs

70

- run_monitors: Principal URNs who can monitor flow runs

71

- keywords: Searchable tags for flow discovery

72

- subscription_id: Associated Globus subscription

73

- additional_fields: Additional metadata fields

74

75

Returns:

76

GlobusHTTPResponse with flow ID and creation details

77

"""

78

79

def get_flow(

80

self,

81

flow_id: str | UUID,

82

*,

83

query_params: dict[str, Any] | None = None

84

) -> GlobusHTTPResponse:

85

"""

86

Retrieve flow definition and metadata.

87

88

Parameters:

89

- flow_id: UUID of the flow to retrieve

90

- query_params: Additional query parameters

91

92

Returns:

93

GlobusHTTPResponse with complete flow definition and metadata

94

"""

95

96

def update_flow(

97

self,

98

flow_id: str | UUID,

99

*,

100

title: str | None = None,

101

definition: dict[str, Any] | None = None,

102

input_schema: dict[str, Any] | None = None,

103

subtitle: str | None = None,

104

description: str | None = None,

105

flow_owner: str | None = None,

106

flow_viewers: list[str] | None = None,

107

flow_starters: list[str] | None = None,

108

flow_administrators: list[str] | None = None,

109

run_managers: list[str] | None = None,

110

run_monitors: list[str] | None = None,

111

keywords: list[str] | None = None,

112

subscription_id: str | UUID | None = None,

113

additional_fields: dict[str, Any] | None = None

114

) -> GlobusHTTPResponse:

115

"""

116

Update an existing flow definition.

117

118

Updates flow metadata, definition, or access policies.

119

Only specified fields will be modified.

120

121

Parameters:

122

- flow_id: UUID of flow to update

123

- title: Updated flow title

124

- definition: Updated state machine definition

125

- input_schema: Updated input validation schema

126

- Other parameters: See create_flow for descriptions

127

128

Returns:

129

GlobusHTTPResponse confirming update

130

"""

131

132

def delete_flow(

133

self,

134

flow_id: str | UUID,

135

*,

136

query_params: dict[str, Any] | None = None

137

) -> GlobusHTTPResponse:

138

"""

139

Delete a flow definition.

140

141

Permanently removes a flow. Running instances will continue

142

but no new runs can be started.

143

144

Parameters:

145

- flow_id: UUID of flow to delete

146

- query_params: Additional parameters

147

148

Returns:

149

GlobusHTTPResponse confirming deletion

150

"""

151

152

def validate_flow(

153

self,

154

definition: dict[str, Any],

155

input_schema: dict[str, Any] | MissingType = MISSING

156

) -> GlobusHTTPResponse:

157

"""

158

Validate flow definition and schema.

159

160

Checks flow definition syntax, state transitions, and

161

input schema validity without creating the flow.

162

163

Parameters:

164

- definition: Flow state machine definition to validate

165

- input_schema: Input schema to validate

166

167

Returns:

168

GlobusHTTPResponse with validation results and any errors

169

"""

170

171

def list_flows(

172

self,

173

*,

174

filter_role: str | None = None,

175

filter_roles: str | Iterable[str] | None = None,

176

filter_fulltext: str | None = None,

177

orderby: str | Iterable[str] | None = None,

178

marker: str | None = None,

179

query_params: dict[str, Any] | None = None

180

) -> IterableFlowsResponse:

181

"""

182

List accessible flows with filtering and pagination.

183

184

Parameters:

185

- filter_role: Deprecated - minimum role required for inclusion

186

- filter_roles: List of roles for filtering (flow_viewer, flow_starter, etc.)

187

- filter_fulltext: Full-text search across flow metadata

188

- orderby: Sort criteria (e.g., "updated_at DESC", "title ASC")

189

- marker: Pagination marker

190

- query_params: Additional query parameters

191

192

Returns:

193

IterableFlowsResponse with paginated flow listings

194

"""

195

```

196

197

### Flow Execution and Run Management

198

199

Start flow runs, monitor execution, and manage running workflow instances with comprehensive status tracking.

200

201

```python { .api }

202

def run_flow(

203

self,

204

body: dict[str, Any],

205

*,

206

label: str | None = None,

207

tags: list[str] | None = None,

208

activity_notification_policy: (

209

dict[str, Any] | RunActivityNotificationPolicy | None

210

) = None,

211

run_monitors: list[str] | None = None,

212

run_managers: list[str] | None = None,

213

additional_fields: dict[str, Any] | None = None

214

) -> GlobusHTTPResponse:

215

"""

216

Start execution of a flow with input data.

217

218

Creates a new run instance of the flow with the provided

219

input data, which is validated against the flow's input schema.

220

221

Parameters:

222

- body: Input data for the flow (validated against input_schema)

223

- label: Human-readable run title (1-64 characters)

224

- tags: Searchable tags for the run

225

- activity_notification_policy: Email notification configuration

226

- run_monitors: Principal URNs authorized to view this run

227

- run_managers: Principal URNs authorized to manage this run

228

- additional_fields: Additional run metadata

229

230

Returns:

231

GlobusHTTPResponse with run ID and execution details

232

"""

233

234

def get_run(

235

self,

236

run_id: str | UUID,

237

*,

238

include_flow_description: bool | None = None,

239

query_params: dict[str, Any] | None = None

240

) -> GlobusHTTPResponse:

241

"""

242

Get detailed information about a flow run.

243

244

Parameters:

245

- run_id: UUID of the run to retrieve

246

- include_flow_description: Include flow metadata in response

247

- query_params: Additional query parameters

248

249

Returns:

250

GlobusHTTPResponse with run status, results, and execution details

251

"""

252

253

def get_run_definition(

254

self,

255

run_id: str | UUID

256

) -> GlobusHTTPResponse:

257

"""

258

Get flow definition and input schema used for a specific run.

259

260

Returns the exact flow definition and input schema that were

261

active when the run was started, useful for reproducibility.

262

263

Parameters:

264

- run_id: UUID of the run

265

266

Returns:

267

GlobusHTTPResponse with flow definition and input schema

268

"""

269

270

def cancel_run(

271

self,

272

run_id: str | UUID,

273

*,

274

query_params: dict[str, Any] | None = None

275

) -> GlobusHTTPResponse:

276

"""

277

Cancel a running or pending flow execution.

278

279

Attempts to gracefully stop flow execution. Running states

280

may complete but no new states will be started.

281

282

Parameters:

283

- run_id: UUID of run to cancel

284

- query_params: Additional parameters

285

286

Returns:

287

GlobusHTTPResponse confirming cancellation request

288

"""

289

290

def release_run(

291

self,

292

run_id: str | UUID,

293

*,

294

query_params: dict[str, Any] | None = None

295

) -> GlobusHTTPResponse:

296

"""

297

Release a completed run from monitoring.

298

299

Marks run as released, reducing storage usage and removing

300

it from active monitoring. Run logs remain accessible.

301

302

Parameters:

303

- run_id: UUID of completed run to release

304

- query_params: Additional parameters

305

306

Returns:

307

GlobusHTTPResponse confirming release

308

"""

309

310

def list_runs(

311

self,

312

*,

313

filter_flow_id: str | UUID | None = None,

314

filter_role: str | None = None,

315

filter_roles: str | Iterable[str] | None = None,

316

filter_status: str | Iterable[str] | None = None,

317

filter_label: str | None = None,

318

filter_tags: str | Iterable[str] | None = None,

319

orderby: str | Iterable[str] | None = None,

320

marker: str | None = None,

321

query_params: dict[str, Any] | None = None

322

) -> IterableRunsResponse:

323

"""

324

List flow runs with comprehensive filtering options.

325

326

Parameters:

327

- filter_flow_id: Only runs of specified flow

328

- filter_role: Deprecated - minimum role required

329

- filter_roles: Required roles for access

330

- filter_status: Run statuses to include (ACTIVE, SUCCEEDED, FAILED, etc.)

331

- filter_label: Filter by run label

332

- filter_tags: Filter by run tags

333

- orderby: Sort criteria

334

- marker: Pagination marker

335

- query_params: Additional parameters

336

337

Returns:

338

IterableRunsResponse with paginated run listings

339

"""

340

341

def get_run_logs(

342

self,

343

run_id: str | UUID,

344

*,

345

limit: int | None = None,

346

reverse_order: bool | None = None,

347

marker: str | None = None,

348

query_params: dict[str, Any] | None = None

349

) -> IterableRunLogsResponse:

350

"""

351

Get execution logs for a flow run.

352

353

Returns detailed execution logs including state transitions,

354

action results, and error information for debugging.

355

356

Parameters:

357

- run_id: UUID of the run

358

- limit: Maximum log entries to return

359

- reverse_order: Return logs in reverse chronological order

360

- marker: Pagination marker

361

- query_params: Additional parameters

362

363

Returns:

364

IterableRunLogsResponse with paginated log entries

365

"""

366

```

367

368

### Specific Flow Client

369

370

Specialized client for managing individual flows with scoped permissions and streamlined operations.

371

372

```python { .api }

373

class SpecificFlowClient(FlowsClient):

374

"""

375

Client scoped to operations on a specific flow.

376

377

Provides streamlined access to flow operations without

378

repeatedly specifying the flow ID, with automatic scope

379

management for flow-specific permissions.

380

"""

381

382

def __init__(

383

self,

384

flow_id: str | UUID,

385

*,

386

app: GlobusApp | None = None,

387

authorizer: GlobusAuthorizer | None = None,

388

flow_scope: str | None = None,

389

**kwargs

390

) -> None: ...

391

392

def run_flow(

393

self,

394

body: dict[str, Any],

395

*,

396

label: str | None = None,

397

tags: list[str] | None = None,

398

**kwargs

399

) -> GlobusHTTPResponse:

400

"""Run the specific flow with input data."""

401

402

def get_flow(self, **kwargs) -> GlobusHTTPResponse:

403

"""Get the specific flow definition."""

404

405

def update_flow(self, **kwargs) -> GlobusHTTPResponse:

406

"""Update the specific flow."""

407

408

def delete_flow(self, **kwargs) -> GlobusHTTPResponse:

409

"""Delete the specific flow."""

410

```

411

412

### Response Objects and Data Classes

413

414

Specialized response classes and data containers for flow operations with enhanced iteration and notification support.

415

416

```python { .api }

417

class IterableFlowsResponse(IterableResponse):

418

"""Response class for flow listings with pagination support."""

419

420

def __iter__(self) -> Iterator[dict[str, Any]]:

421

"""Iterate over flow definitions."""

422

423

class IterableRunsResponse(IterableResponse):

424

"""Response class for flow run listings with pagination support."""

425

426

def __iter__(self) -> Iterator[dict[str, Any]]:

427

"""Iterate over run records."""

428

429

class IterableRunLogsResponse(IterableResponse):

430

"""Response class for run log entries with pagination support."""

431

432

def __iter__(self) -> Iterator[dict[str, Any]]:

433

"""Iterate over log entries."""

434

435

class RunActivityNotificationPolicy(PayloadWrapper):

436

"""

437

Notification policy configuration for flow runs.

438

439

Defines when email notifications will be sent based on

440

run status changes and execution events.

441

"""

442

443

def __init__(

444

self,

445

status: (

446

list[Literal["INACTIVE", "SUCCEEDED", "FAILED"]] | MissingType

447

) = MISSING

448

) -> None: ...

449

```

450

451

### Error Handling

452

453

Flows-specific error handling for workflow execution and management operations.

454

455

```python { .api }

456

class FlowsAPIError(GlobusAPIError):

457

"""

458

Error class for Flows service API errors.

459

460

Provides enhanced error handling for flow-specific error

461

conditions including validation failures and execution errors.

462

"""

463

```

464

465

## Common Usage Patterns

466

467

### Basic Flow Creation and Execution

468

469

```python

470

from globus_sdk import FlowsClient

471

472

# Initialize client

473

flows_client = FlowsClient(authorizer=authorizer)

474

475

# Define a simple transfer flow

476

flow_definition = {

477

"Comment": "Simple transfer workflow",

478

"StartAt": "TransferData",

479

"States": {

480

"TransferData": {

481

"Comment": "Transfer files between endpoints",

482

"Type": "Action",

483

"ActionUrl": "https://actions.automate.globus.org/transfer/transfer",

484

"Parameters": {

485

"source_endpoint_id": "$.source_endpoint",

486

"destination_endpoint_id": "$.dest_endpoint",

487

"transfer_items": [

488

{

489

"source_path": "$.source_path",

490

"destination_path": "$.dest_path",

491

"recursive": "$.recursive"

492

}

493

]

494

},

495

"ResultPath": "$.TransferResult",

496

"End": True

497

}

498

}

499

}

500

501

# Input schema for validation

502

input_schema = {

503

"type": "object",

504

"properties": {

505

"source_endpoint": {"type": "string"},

506

"dest_endpoint": {"type": "string"},

507

"source_path": {"type": "string"},

508

"dest_path": {"type": "string"},

509

"recursive": {"type": "boolean", "default": False}

510

},

511

"required": ["source_endpoint", "dest_endpoint", "source_path", "dest_path"]

512

}

513

514

# Create flow

515

create_response = flows_client.create_flow(

516

title="Simple Transfer Flow",

517

definition=flow_definition,

518

input_schema=input_schema,

519

description="Basic file transfer automation",

520

flow_starters=["all_authenticated_users"]

521

)

522

flow_id = create_response["id"]

523

524

# Run the flow

525

run_input = {

526

"source_endpoint": "ddb59aef-6d04-11e5-ba46-22000b92c6ec",

527

"dest_endpoint": "ddb59af0-6d04-11e5-ba46-22000b92c6ec",

528

"source_path": "/share/godata/file1.txt",

529

"dest_path": "/~/file1.txt",

530

"recursive": False

531

}

532

533

run_response = flows_client.run_flow(

534

flow_id,

535

body=run_input,

536

label="Transfer file1.txt"

537

)

538

run_id = run_response["action_id"]

539

540

print(f"Flow run started: {run_id}")

541

```

542

543

### Complex Multi-Step Workflow

544

545

```python

546

# Define a complex workflow with conditional logic

547

complex_flow = {

548

"Comment": "Data processing pipeline",

549

"StartAt": "ValidateInput",

550

"States": {

551

"ValidateInput": {

552

"Type": "Action",

553

"ActionUrl": "https://actions.automate.globus.org/transfer/ls",

554

"Parameters": {

555

"endpoint_id": "$.source_endpoint",

556

"path": "$.input_path"

557

},

558

"ResultPath": "$.ValidationResult",

559

"Next": "CheckFileExists"

560

},

561

"CheckFileExists": {

562

"Type": "Choice",

563

"Choices": [

564

{

565

"Variable": "$.ValidationResult.code",

566

"StringEquals": "success",

567

"Next": "ProcessData"

568

}

569

],

570

"Default": "NotifyFailure"

571

},

572

"ProcessData": {

573

"Type": "Action",

574

"ActionUrl": "https://compute.actions.globus.org/fxap",

575

"Parameters": {

576

"endpoint": "$.compute_endpoint",

577

"function": "$.processing_function",

578

"payload": {

579

"input_file": "$.input_path",

580

"output_file": "$.output_path"

581

}

582

},

583

"ResultPath": "$.ProcessResult",

584

"Next": "TransferResults"

585

},

586

"TransferResults": {

587

"Type": "Action",

588

"ActionUrl": "https://actions.automate.globus.org/transfer/transfer",

589

"Parameters": {

590

"source_endpoint_id": "$.compute_endpoint",

591

"destination_endpoint_id": "$.output_endpoint",

592

"transfer_items": [{

593

"source_path": "$.output_path",

594

"destination_path": "$.final_path"

595

}]

596

},

597

"End": True

598

},

599

"NotifyFailure": {

600

"Type": "Action",

601

"ActionUrl": "https://actions.automate.globus.org/notification/notify",

602

"Parameters": {

603

"message": "Input validation failed",

604

"recipients": ["$.notification_email"]

605

},

606

"End": True

607

}

608

}

609

}

610

611

# Create with notification policy

612

notification_policy = RunActivityNotificationPolicy(

613

status=["SUCCEEDED", "FAILED"]

614

)

615

616

create_response = flows_client.create_flow(

617

title="Data Processing Pipeline",

618

definition=complex_flow,

619

input_schema={

620

"type": "object",

621

"properties": {

622

"source_endpoint": {"type": "string"},

623

"compute_endpoint": {"type": "string"},

624

"output_endpoint": {"type": "string"},

625

"input_path": {"type": "string"},

626

"output_path": {"type": "string"},

627

"final_path": {"type": "string"},

628

"processing_function": {"type": "string"},

629

"notification_email": {"type": "string"}

630

},

631

"required": ["source_endpoint", "input_path", "processing_function"]

632

}

633

)

634

```

635

636

### Flow Run Monitoring and Management

637

638

```python

639

# Monitor flow execution

640

run_id = "run-uuid-here"

641

642

while True:

643

run_info = flows_client.get_run(run_id, include_flow_description=True)

644

status = run_info["status"]

645

646

print(f"Run status: {status}")

647

648

if status in ["SUCCEEDED", "FAILED", "INACTIVE"]:

649

break

650

651

time.sleep(10)

652

653

# Get detailed logs

654

logs = flows_client.get_run_logs(run_id, limit=100)

655

for log_entry in logs:

656

print(f"{log_entry['time']}: {log_entry['details']}")

657

658

# List all runs for a flow

659

runs = flows_client.list_runs(

660

filter_flow_id=flow_id,

661

filter_status=["ACTIVE", "SUCCEEDED"],

662

orderby="start_time DESC"

663

)

664

665

for run in runs:

666

print(f"Run {run['action_id']}: {run['status']} - {run['label']}")

667

```

668

669

### Using Specific Flow Client

670

671

```python

672

from globus_sdk import SpecificFlowClient

673

674

# Create flow-specific client

675

specific_client = SpecificFlowClient(

676

flow_id="flow-uuid-here",

677

app=app,

678

flow_scope="https://auth.globus.org/scopes/flow-uuid-here/flow_run"

679

)

680

681

# Simplified operations without specifying flow_id

682

run_response = specific_client.run_flow(

683

body=run_input,

684

label="Automated run"

685

)

686

687

flow_info = specific_client.get_flow()

688

print(f"Flow title: {flow_info['title']}")

689

690

# Update flow definition

691

specific_client.update_flow(

692

description="Updated description",

693

keywords=["updated", "automated"]

694

)

695

```