0
# Flows Service
1
2
Workflow automation and orchestration for complex multi-step operations across Globus services with conditional logic, error handling, and state management. The Flows service enables creation and execution of sophisticated automation workflows that can coordinate data transfers, compute jobs, and other service operations.
3
4
## Capabilities
5
6
### Flows Client
7
8
Core client for managing flow definitions, executing workflows, and monitoring flow runs with comprehensive filtering and access control.
9
10
```python { .api }
11
class FlowsClient(BaseClient):
12
"""
13
Client for Globus Flows service operations.
14
15
Provides methods for flow lifecycle management including creation, execution,
16
monitoring, and administration with comprehensive access control and
17
notification capabilities.
18
"""
19
20
def __init__(
21
self,
22
*,
23
app: GlobusApp | None = None,
24
authorizer: GlobusAuthorizer | None = None,
25
environment: str | None = None,
26
base_url: str | None = None,
27
**kwargs
28
) -> None: ...
29
```
30
31
### Flow Definition and Management
32
33
Create, update, and manage workflow definitions with JSON-based state machines and comprehensive access control policies.
34
35
```python { .api }
36
def create_flow(
37
self,
38
title: str,
39
definition: dict[str, Any],
40
input_schema: dict[str, Any],
41
*,
42
subtitle: str | None = None,
43
description: str | None = None,
44
flow_viewers: list[str] | None = None,
45
flow_starters: list[str] | None = None,
46
flow_administrators: list[str] | None = None,
47
run_managers: list[str] | None = None,
48
run_monitors: list[str] | None = None,
49
keywords: list[str] | None = None,
50
subscription_id: str | UUID | None = None,
51
additional_fields: dict[str, Any] | None = None
52
) -> GlobusHTTPResponse:
53
"""
54
Create a new flow definition.
55
56
Creates a workflow with states, transitions, and execution logic
57
defined using Amazon States Language (ASL) syntax with Globus
58
service integrations.
59
60
Parameters:
61
- title: Human-readable flow name (1-128 characters)
62
- definition: JSON state machine definition (ASL format)
63
- input_schema: JSON Schema for validating flow input
64
- subtitle: Concise flow summary (0-128 characters)
65
- description: Detailed flow description (0-4096 characters)
66
- flow_viewers: Principal URNs who can view the flow
67
- flow_starters: Principal URNs who can run the flow
68
- flow_administrators: Principal URNs who can manage the flow
69
- run_managers: Principal URNs who can manage flow runs
70
- run_monitors: Principal URNs who can monitor flow runs
71
- keywords: Searchable tags for flow discovery
72
- subscription_id: Associated Globus subscription
73
- additional_fields: Additional metadata fields
74
75
Returns:
76
GlobusHTTPResponse with flow ID and creation details
77
"""
78
79
def get_flow(
80
self,
81
flow_id: str | UUID,
82
*,
83
query_params: dict[str, Any] | None = None
84
) -> GlobusHTTPResponse:
85
"""
86
Retrieve flow definition and metadata.
87
88
Parameters:
89
- flow_id: UUID of the flow to retrieve
90
- query_params: Additional query parameters
91
92
Returns:
93
GlobusHTTPResponse with complete flow definition and metadata
94
"""
95
96
def update_flow(
97
self,
98
flow_id: str | UUID,
99
*,
100
title: str | None = None,
101
definition: dict[str, Any] | None = None,
102
input_schema: dict[str, Any] | None = None,
103
subtitle: str | None = None,
104
description: str | None = None,
105
flow_owner: str | None = None,
106
flow_viewers: list[str] | None = None,
107
flow_starters: list[str] | None = None,
108
flow_administrators: list[str] | None = None,
109
run_managers: list[str] | None = None,
110
run_monitors: list[str] | None = None,
111
keywords: list[str] | None = None,
112
subscription_id: str | UUID | None = None,
113
additional_fields: dict[str, Any] | None = None
114
) -> GlobusHTTPResponse:
115
"""
116
Update an existing flow definition.
117
118
Updates flow metadata, definition, or access policies.
119
Only specified fields will be modified.
120
121
Parameters:
122
- flow_id: UUID of flow to update
123
- title: Updated flow title
124
- definition: Updated state machine definition
125
- input_schema: Updated input validation schema
126
- Other parameters: See create_flow for descriptions
127
128
Returns:
129
GlobusHTTPResponse confirming update
130
"""
131
132
def delete_flow(
133
self,
134
flow_id: str | UUID,
135
*,
136
query_params: dict[str, Any] | None = None
137
) -> GlobusHTTPResponse:
138
"""
139
Delete a flow definition.
140
141
Permanently removes a flow. Running instances will continue
142
but no new runs can be started.
143
144
Parameters:
145
- flow_id: UUID of flow to delete
146
- query_params: Additional parameters
147
148
Returns:
149
GlobusHTTPResponse confirming deletion
150
"""
151
152
def validate_flow(
153
self,
154
definition: dict[str, Any],
155
input_schema: dict[str, Any] | MissingType = MISSING
156
) -> GlobusHTTPResponse:
157
"""
158
Validate flow definition and schema.
159
160
Checks flow definition syntax, state transitions, and
161
input schema validity without creating the flow.
162
163
Parameters:
164
- definition: Flow state machine definition to validate
165
- input_schema: Input schema to validate
166
167
Returns:
168
GlobusHTTPResponse with validation results and any errors
169
"""
170
171
def list_flows(
172
self,
173
*,
174
filter_role: str | None = None,
175
filter_roles: str | Iterable[str] | None = None,
176
filter_fulltext: str | None = None,
177
orderby: str | Iterable[str] | None = None,
178
marker: str | None = None,
179
query_params: dict[str, Any] | None = None
180
) -> IterableFlowsResponse:
181
"""
182
List accessible flows with filtering and pagination.
183
184
Parameters:
185
- filter_role: Deprecated - minimum role required for inclusion
186
- filter_roles: List of roles for filtering (flow_viewer, flow_starter, etc.)
187
- filter_fulltext: Full-text search across flow metadata
188
- orderby: Sort criteria (e.g., "updated_at DESC", "title ASC")
189
- marker: Pagination marker
190
- query_params: Additional query parameters
191
192
Returns:
193
IterableFlowsResponse with paginated flow listings
194
"""
195
```
196
197
### Flow Execution and Run Management
198
199
Start flow runs, monitor execution, and manage running workflow instances with comprehensive status tracking.
200
201
```python { .api }
202
def run_flow(
203
self,
204
body: dict[str, Any],
205
*,
206
label: str | None = None,
207
tags: list[str] | None = None,
208
activity_notification_policy: (
209
dict[str, Any] | RunActivityNotificationPolicy | None
210
) = None,
211
run_monitors: list[str] | None = None,
212
run_managers: list[str] | None = None,
213
additional_fields: dict[str, Any] | None = None
214
) -> GlobusHTTPResponse:
215
"""
216
Start execution of a flow with input data.
217
218
Creates a new run instance of the flow with the provided
219
input data, which is validated against the flow's input schema.
220
221
Parameters:
222
- body: Input data for the flow (validated against input_schema)
223
- label: Human-readable run title (1-64 characters)
224
- tags: Searchable tags for the run
225
- activity_notification_policy: Email notification configuration
226
- run_monitors: Principal URNs authorized to view this run
227
- run_managers: Principal URNs authorized to manage this run
228
- additional_fields: Additional run metadata
229
230
Returns:
231
GlobusHTTPResponse with run ID and execution details
232
"""
233
234
def get_run(
235
self,
236
run_id: str | UUID,
237
*,
238
include_flow_description: bool | None = None,
239
query_params: dict[str, Any] | None = None
240
) -> GlobusHTTPResponse:
241
"""
242
Get detailed information about a flow run.
243
244
Parameters:
245
- run_id: UUID of the run to retrieve
246
- include_flow_description: Include flow metadata in response
247
- query_params: Additional query parameters
248
249
Returns:
250
GlobusHTTPResponse with run status, results, and execution details
251
"""
252
253
def get_run_definition(
254
self,
255
run_id: str | UUID
256
) -> GlobusHTTPResponse:
257
"""
258
Get flow definition and input schema used for a specific run.
259
260
Returns the exact flow definition and input schema that were
261
active when the run was started, useful for reproducibility.
262
263
Parameters:
264
- run_id: UUID of the run
265
266
Returns:
267
GlobusHTTPResponse with flow definition and input schema
268
"""
269
270
def cancel_run(
271
self,
272
run_id: str | UUID,
273
*,
274
query_params: dict[str, Any] | None = None
275
) -> GlobusHTTPResponse:
276
"""
277
Cancel a running or pending flow execution.
278
279
Attempts to gracefully stop flow execution. Running states
280
may complete but no new states will be started.
281
282
Parameters:
283
- run_id: UUID of run to cancel
284
- query_params: Additional parameters
285
286
Returns:
287
GlobusHTTPResponse confirming cancellation request
288
"""
289
290
def release_run(
291
self,
292
run_id: str | UUID,
293
*,
294
query_params: dict[str, Any] | None = None
295
) -> GlobusHTTPResponse:
296
"""
297
Release a completed run from monitoring.
298
299
Marks run as released, reducing storage usage and removing
300
it from active monitoring. Run logs remain accessible.
301
302
Parameters:
303
- run_id: UUID of completed run to release
304
- query_params: Additional parameters
305
306
Returns:
307
GlobusHTTPResponse confirming release
308
"""
309
310
def list_runs(
311
self,
312
*,
313
filter_flow_id: str | UUID | None = None,
314
filter_role: str | None = None,
315
filter_roles: str | Iterable[str] | None = None,
316
filter_status: str | Iterable[str] | None = None,
317
filter_label: str | None = None,
318
filter_tags: str | Iterable[str] | None = None,
319
orderby: str | Iterable[str] | None = None,
320
marker: str | None = None,
321
query_params: dict[str, Any] | None = None
322
) -> IterableRunsResponse:
323
"""
324
List flow runs with comprehensive filtering options.
325
326
Parameters:
327
- filter_flow_id: Only runs of specified flow
328
- filter_role: Deprecated - minimum role required
329
- filter_roles: Required roles for access
330
- filter_status: Run statuses to include (ACTIVE, SUCCEEDED, FAILED, etc.)
331
- filter_label: Filter by run label
332
- filter_tags: Filter by run tags
333
- orderby: Sort criteria
334
- marker: Pagination marker
335
- query_params: Additional parameters
336
337
Returns:
338
IterableRunsResponse with paginated run listings
339
"""
340
341
def get_run_logs(
342
self,
343
run_id: str | UUID,
344
*,
345
limit: int | None = None,
346
reverse_order: bool | None = None,
347
marker: str | None = None,
348
query_params: dict[str, Any] | None = None
349
) -> IterableRunLogsResponse:
350
"""
351
Get execution logs for a flow run.
352
353
Returns detailed execution logs including state transitions,
354
action results, and error information for debugging.
355
356
Parameters:
357
- run_id: UUID of the run
358
- limit: Maximum log entries to return
359
- reverse_order: Return logs in reverse chronological order
360
- marker: Pagination marker
361
- query_params: Additional parameters
362
363
Returns:
364
IterableRunLogsResponse with paginated log entries
365
"""
366
```
367
368
### Specific Flow Client
369
370
Specialized client for managing individual flows with scoped permissions and streamlined operations.
371
372
```python { .api }
373
class SpecificFlowClient(FlowsClient):
374
"""
375
Client scoped to operations on a specific flow.
376
377
Provides streamlined access to flow operations without
378
repeatedly specifying the flow ID, with automatic scope
379
management for flow-specific permissions.
380
"""
381
382
def __init__(
383
self,
384
flow_id: str | UUID,
385
*,
386
app: GlobusApp | None = None,
387
authorizer: GlobusAuthorizer | None = None,
388
flow_scope: str | None = None,
389
**kwargs
390
) -> None: ...
391
392
def run_flow(
393
self,
394
body: dict[str, Any],
395
*,
396
label: str | None = None,
397
tags: list[str] | None = None,
398
**kwargs
399
) -> GlobusHTTPResponse:
400
"""Run the specific flow with input data."""
401
402
def get_flow(self, **kwargs) -> GlobusHTTPResponse:
403
"""Get the specific flow definition."""
404
405
def update_flow(self, **kwargs) -> GlobusHTTPResponse:
406
"""Update the specific flow."""
407
408
def delete_flow(self, **kwargs) -> GlobusHTTPResponse:
409
"""Delete the specific flow."""
410
```
411
412
### Response Objects and Data Classes
413
414
Specialized response classes and data containers for flow operations with enhanced iteration and notification support.
415
416
```python { .api }
417
class IterableFlowsResponse(IterableResponse):
418
"""Response class for flow listings with pagination support."""
419
420
def __iter__(self) -> Iterator[dict[str, Any]]:
421
"""Iterate over flow definitions."""
422
423
class IterableRunsResponse(IterableResponse):
424
"""Response class for flow run listings with pagination support."""
425
426
def __iter__(self) -> Iterator[dict[str, Any]]:
427
"""Iterate over run records."""
428
429
class IterableRunLogsResponse(IterableResponse):
430
"""Response class for run log entries with pagination support."""
431
432
def __iter__(self) -> Iterator[dict[str, Any]]:
433
"""Iterate over log entries."""
434
435
class RunActivityNotificationPolicy(PayloadWrapper):
436
"""
437
Notification policy configuration for flow runs.
438
439
Defines when email notifications will be sent based on
440
run status changes and execution events.
441
"""
442
443
def __init__(
444
self,
445
status: (
446
list[Literal["INACTIVE", "SUCCEEDED", "FAILED"]] | MissingType
447
) = MISSING
448
) -> None: ...
449
```
450
451
### Error Handling
452
453
Flows-specific error handling for workflow execution and management operations.
454
455
```python { .api }
456
class FlowsAPIError(GlobusAPIError):
457
"""
458
Error class for Flows service API errors.
459
460
Provides enhanced error handling for flow-specific error
461
conditions including validation failures and execution errors.
462
"""
463
```
464
465
## Common Usage Patterns
466
467
### Basic Flow Creation and Execution
468
469
```python
470
from globus_sdk import FlowsClient
471
472
# Initialize client
473
flows_client = FlowsClient(authorizer=authorizer)
474
475
# Define a simple transfer flow
476
flow_definition = {
477
"Comment": "Simple transfer workflow",
478
"StartAt": "TransferData",
479
"States": {
480
"TransferData": {
481
"Comment": "Transfer files between endpoints",
482
"Type": "Action",
483
"ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
484
"Parameters": {
485
"source_endpoint_id": "$.source_endpoint",
486
"destination_endpoint_id": "$.dest_endpoint",
487
"transfer_items": [
488
{
489
"source_path": "$.source_path",
490
"destination_path": "$.dest_path",
491
"recursive": "$.recursive"
492
}
493
]
494
},
495
"ResultPath": "$.TransferResult",
496
"End": True
497
}
498
}
499
}
500
501
# Input schema for validation
502
input_schema = {
503
"type": "object",
504
"properties": {
505
"source_endpoint": {"type": "string"},
506
"dest_endpoint": {"type": "string"},
507
"source_path": {"type": "string"},
508
"dest_path": {"type": "string"},
509
"recursive": {"type": "boolean", "default": False}
510
},
511
"required": ["source_endpoint", "dest_endpoint", "source_path", "dest_path"]
512
}
513
514
# Create flow
515
create_response = flows_client.create_flow(
516
title="Simple Transfer Flow",
517
definition=flow_definition,
518
input_schema=input_schema,
519
description="Basic file transfer automation",
520
flow_starters=["all_authenticated_users"]
521
)
522
flow_id = create_response["id"]
523
524
# Run the flow
525
run_input = {
526
"source_endpoint": "ddb59aef-6d04-11e5-ba46-22000b92c6ec",
527
"dest_endpoint": "ddb59af0-6d04-11e5-ba46-22000b92c6ec",
528
"source_path": "/share/godata/file1.txt",
529
"dest_path": "/~/file1.txt",
530
"recursive": False
531
}
532
533
run_response = flows_client.run_flow(
534
flow_id,
535
body=run_input,
536
label="Transfer file1.txt"
537
)
538
run_id = run_response["action_id"]
539
540
print(f"Flow run started: {run_id}")
541
```
542
543
### Complex Multi-Step Workflow
544
545
```python
546
# Define a complex workflow with conditional logic
547
complex_flow = {
548
"Comment": "Data processing pipeline",
549
"StartAt": "ValidateInput",
550
"States": {
551
"ValidateInput": {
552
"Type": "Action",
553
"ActionUrl": "https://actions.automate.globus.org/transfer/ls",
554
"Parameters": {
555
"endpoint_id": "$.source_endpoint",
556
"path": "$.input_path"
557
},
558
"ResultPath": "$.ValidationResult",
559
"Next": "CheckFileExists"
560
},
561
"CheckFileExists": {
562
"Type": "Choice",
563
"Choices": [
564
{
565
"Variable": "$.ValidationResult.code",
566
"StringEquals": "success",
567
"Next": "ProcessData"
568
}
569
],
570
"Default": "NotifyFailure"
571
},
572
"ProcessData": {
573
"Type": "Action",
574
"ActionUrl": "https://compute.actions.globus.org/fxap",
575
"Parameters": {
576
"endpoint": "$.compute_endpoint",
577
"function": "$.processing_function",
578
"payload": {
579
"input_file": "$.input_path",
580
"output_file": "$.output_path"
581
}
582
},
583
"ResultPath": "$.ProcessResult",
584
"Next": "TransferResults"
585
},
586
"TransferResults": {
587
"Type": "Action",
588
"ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
589
"Parameters": {
590
"source_endpoint_id": "$.compute_endpoint",
591
"destination_endpoint_id": "$.output_endpoint",
592
"transfer_items": [{
593
"source_path": "$.output_path",
594
"destination_path": "$.final_path"
595
}]
596
},
597
"End": True
598
},
599
"NotifyFailure": {
600
"Type": "Action",
601
"ActionUrl": "https://actions.automate.globus.org/notification/notify",
602
"Parameters": {
603
"message": "Input validation failed",
604
"recipients": ["$.notification_email"]
605
},
606
"End": True
607
}
608
}
609
}
610
611
# Create with notification policy
612
notification_policy = RunActivityNotificationPolicy(
613
status=["SUCCEEDED", "FAILED"]
614
)
615
616
create_response = flows_client.create_flow(
617
title="Data Processing Pipeline",
618
definition=complex_flow,
619
input_schema={
620
"type": "object",
621
"properties": {
622
"source_endpoint": {"type": "string"},
623
"compute_endpoint": {"type": "string"},
624
"output_endpoint": {"type": "string"},
625
"input_path": {"type": "string"},
626
"output_path": {"type": "string"},
627
"final_path": {"type": "string"},
628
"processing_function": {"type": "string"},
629
"notification_email": {"type": "string"}
630
},
631
"required": ["source_endpoint", "input_path", "processing_function"]
632
}
633
)
634
```
635
636
### Flow Run Monitoring and Management
637
638
```python
639
# Monitor flow execution
640
run_id = "run-uuid-here"
641
642
while True:
643
run_info = flows_client.get_run(run_id, include_flow_description=True)
644
status = run_info["status"]
645
646
print(f"Run status: {status}")
647
648
if status in ["SUCCEEDED", "FAILED", "INACTIVE"]:
649
break
650
651
time.sleep(10)
652
653
# Get detailed logs
654
logs = flows_client.get_run_logs(run_id, limit=100)
655
for log_entry in logs:
656
print(f"{log_entry['time']}: {log_entry['details']}")
657
658
# List all runs for a flow
659
runs = flows_client.list_runs(
660
filter_flow_id=flow_id,
661
filter_status=["ACTIVE", "SUCCEEDED"],
662
orderby="start_time DESC"
663
)
664
665
for run in runs:
666
print(f"Run {run['action_id']}: {run['status']} - {run['label']}")
667
```
668
669
### Using Specific Flow Client
670
671
```python
672
from globus_sdk import SpecificFlowClient
673
674
# Create flow-specific client
675
specific_client = SpecificFlowClient(
676
flow_id="flow-uuid-here",
677
app=app,
678
flow_scope="https://auth.globus.org/scopes/flow-uuid-here/flow_run"
679
)
680
681
# Simplified operations without specifying flow_id
682
run_response = specific_client.run_flow(
683
body=run_input,
684
label="Automated run"
685
)
686
687
flow_info = specific_client.get_flow()
688
print(f"Flow title: {flow_info['title']}")
689
690
# Update flow definition
691
specific_client.update_flow(
692
description="Updated description",
693
keywords=["updated", "automated"]
694
)
695
```