Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Workflow automation and orchestration for complex multi-step operations across Globus services with conditional logic, error handling, and state management. The Flows service enables creation and execution of sophisticated automation workflows that can coordinate data transfers, compute jobs, and other service operations.
Core client for managing flow definitions, executing workflows, and monitoring flow runs with comprehensive filtering and access control.
class FlowsClient(BaseClient):
"""
Client for Globus Flows service operations.
Provides methods for flow lifecycle management including creation, execution,
monitoring, and administration with comprehensive access control and
notification capabilities.
"""
def __init__(
self,
*,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
environment: str | None = None,
base_url: str | None = None,
**kwargs
) -> None: ...Create, update, and manage workflow definitions with JSON-based state machines and comprehensive access control policies.
def create_flow(
self,
title: str,
definition: dict[str, Any],
input_schema: dict[str, Any],
*,
subtitle: str | None = None,
description: str | None = None,
flow_viewers: list[str] | None = None,
flow_starters: list[str] | None = None,
flow_administrators: list[str] | None = None,
run_managers: list[str] | None = None,
run_monitors: list[str] | None = None,
keywords: list[str] | None = None,
subscription_id: str | UUID | None = None,
additional_fields: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Create a new flow definition.
Creates a workflow with states, transitions, and execution logic
defined using Amazon States Language (ASL) syntax with Globus
service integrations.
Parameters:
- title: Human-readable flow name (1-128 characters)
- definition: JSON state machine definition (ASL format)
- input_schema: JSON Schema for validating flow input
- subtitle: Concise flow summary (0-128 characters)
- description: Detailed flow description (0-4096 characters)
- flow_viewers: Principal URNs who can view the flow
- flow_starters: Principal URNs who can run the flow
- flow_administrators: Principal URNs who can manage the flow
- run_managers: Principal URNs who can manage flow runs
- run_monitors: Principal URNs who can monitor flow runs
- keywords: Searchable tags for flow discovery
- subscription_id: Associated Globus subscription
- additional_fields: Additional metadata fields
Returns:
GlobusHTTPResponse with flow ID and creation details
"""
def get_flow(
self,
flow_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Retrieve flow definition and metadata.
Parameters:
- flow_id: UUID of the flow to retrieve
- query_params: Additional query parameters
Returns:
GlobusHTTPResponse with complete flow definition and metadata
"""
def update_flow(
self,
flow_id: str | UUID,
*,
title: str | None = None,
definition: dict[str, Any] | None = None,
input_schema: dict[str, Any] | None = None,
subtitle: str | None = None,
description: str | None = None,
flow_owner: str | None = None,
flow_viewers: list[str] | None = None,
flow_starters: list[str] | None = None,
flow_administrators: list[str] | None = None,
run_managers: list[str] | None = None,
run_monitors: list[str] | None = None,
keywords: list[str] | None = None,
subscription_id: str | UUID | None = None,
additional_fields: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Update an existing flow definition.
Updates flow metadata, definition, or access policies.
Only specified fields will be modified.
Parameters:
- flow_id: UUID of flow to update
- title: Updated flow title
- definition: Updated state machine definition
- input_schema: Updated input validation schema
- Other parameters: See create_flow for descriptions
Returns:
GlobusHTTPResponse confirming update
"""
def delete_flow(
self,
flow_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Delete a flow definition.
Permanently removes a flow. Running instances will continue
but no new runs can be started.
Parameters:
- flow_id: UUID of flow to delete
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming deletion
"""
def validate_flow(
self,
definition: dict[str, Any],
input_schema: dict[str, Any] | MissingType = MISSING
) -> GlobusHTTPResponse:
"""
Validate flow definition and schema.
Checks flow definition syntax, state transitions, and
input schema validity without creating the flow.
Parameters:
- definition: Flow state machine definition to validate
- input_schema: Input schema to validate
Returns:
GlobusHTTPResponse with validation results and any errors
"""
def list_flows(
self,
*,
filter_role: str | None = None,
filter_roles: str | Iterable[str] | None = None,
filter_fulltext: str | None = None,
orderby: str | Iterable[str] | None = None,
marker: str | None = None,
query_params: dict[str, Any] | None = None
) -> IterableFlowsResponse:
"""
List accessible flows with filtering and pagination.
Parameters:
- filter_role: Deprecated - minimum role required for inclusion
- filter_roles: List of roles for filtering (flow_viewer, flow_starter, etc.)
- filter_fulltext: Full-text search across flow metadata
- orderby: Sort criteria (e.g., "updated_at DESC", "title ASC")
- marker: Pagination marker
- query_params: Additional query parameters
Returns:
IterableFlowsResponse with paginated flow listings
"""Start flow runs, monitor execution, and manage running workflow instances with comprehensive status tracking.
def run_flow(
self,
body: dict[str, Any],
*,
label: str | None = None,
tags: list[str] | None = None,
activity_notification_policy: (
dict[str, Any] | RunActivityNotificationPolicy | None
) = None,
run_monitors: list[str] | None = None,
run_managers: list[str] | None = None,
additional_fields: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Start execution of a flow with input data.
Creates a new run instance of the flow with the provided
input data, which is validated against the flow's input schema.
Parameters:
- body: Input data for the flow (validated against input_schema)
- label: Human-readable run title (1-64 characters)
- tags: Searchable tags for the run
- activity_notification_policy: Email notification configuration
- run_monitors: Principal URNs authorized to view this run
- run_managers: Principal URNs authorized to manage this run
- additional_fields: Additional run metadata
Returns:
GlobusHTTPResponse with run ID and execution details
"""
def get_run(
self,
run_id: str | UUID,
*,
include_flow_description: bool | None = None,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Get detailed information about a flow run.
Parameters:
- run_id: UUID of the run to retrieve
- include_flow_description: Include flow metadata in response
- query_params: Additional query parameters
Returns:
GlobusHTTPResponse with run status, results, and execution details
"""
def get_run_definition(
self,
run_id: str | UUID
) -> GlobusHTTPResponse:
"""
Get flow definition and input schema used for a specific run.
Returns the exact flow definition and input schema that were
active when the run was started, useful for reproducibility.
Parameters:
- run_id: UUID of the run
Returns:
GlobusHTTPResponse with flow definition and input schema
"""
def cancel_run(
self,
run_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Cancel a running or pending flow execution.
Attempts to gracefully stop flow execution. Running states
may complete but no new states will be started.
Parameters:
- run_id: UUID of run to cancel
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming cancellation request
"""
def release_run(
self,
run_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Release a completed run from monitoring.
Marks run as released, reducing storage usage and removing
it from active monitoring. Run logs remain accessible.
Parameters:
- run_id: UUID of completed run to release
- query_params: Additional parameters
Returns:
GlobusHTTPResponse confirming release
"""
def list_runs(
self,
*,
filter_flow_id: str | UUID | None = None,
filter_role: str | None = None,
filter_roles: str | Iterable[str] | None = None,
filter_status: str | Iterable[str] | None = None,
filter_label: str | None = None,
filter_tags: str | Iterable[str] | None = None,
orderby: str | Iterable[str] | None = None,
marker: str | None = None,
query_params: dict[str, Any] | None = None
) -> IterableRunsResponse:
"""
List flow runs with comprehensive filtering options.
Parameters:
- filter_flow_id: Only runs of specified flow
- filter_role: Deprecated - minimum role required
- filter_roles: Required roles for access
- filter_status: Run statuses to include (ACTIVE, SUCCEEDED, FAILED, etc.)
- filter_label: Filter by run label
- filter_tags: Filter by run tags
- orderby: Sort criteria
- marker: Pagination marker
- query_params: Additional parameters
Returns:
IterableRunsResponse with paginated run listings
"""
def get_run_logs(
self,
run_id: str | UUID,
*,
limit: int | None = None,
reverse_order: bool | None = None,
marker: str | None = None,
query_params: dict[str, Any] | None = None
) -> IterableRunLogsResponse:
"""
Get execution logs for a flow run.
Returns detailed execution logs including state transitions,
action results, and error information for debugging.
Parameters:
- run_id: UUID of the run
- limit: Maximum log entries to return
- reverse_order: Return logs in reverse chronological order
- marker: Pagination marker
- query_params: Additional parameters
Returns:
IterableRunLogsResponse with paginated log entries
"""Specialized client for managing individual flows with scoped permissions and streamlined operations.
class SpecificFlowClient(FlowsClient):
"""
Client scoped to operations on a specific flow.
Provides streamlined access to flow operations without
repeatedly specifying the flow ID, with automatic scope
management for flow-specific permissions.
"""
def __init__(
self,
flow_id: str | UUID,
*,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
flow_scope: str | None = None,
**kwargs
) -> None: ...
def run_flow(
self,
body: dict[str, Any],
*,
label: str | None = None,
tags: list[str] | None = None,
**kwargs
) -> GlobusHTTPResponse:
"""Run the specific flow with input data."""
def get_flow(self, **kwargs) -> GlobusHTTPResponse:
"""Get the specific flow definition."""
def update_flow(self, **kwargs) -> GlobusHTTPResponse:
"""Update the specific flow."""
def delete_flow(self, **kwargs) -> GlobusHTTPResponse:
"""Delete the specific flow."""Specialized response classes and data containers for flow operations with enhanced iteration and notification support.
class IterableFlowsResponse(IterableResponse):
"""Response class for flow listings with pagination support."""
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Iterate over flow definitions."""
class IterableRunsResponse(IterableResponse):
"""Response class for flow run listings with pagination support."""
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Iterate over run records."""
class IterableRunLogsResponse(IterableResponse):
"""Response class for run log entries with pagination support."""
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Iterate over log entries."""
class RunActivityNotificationPolicy(PayloadWrapper):
"""
Notification policy configuration for flow runs.
Defines when email notifications will be sent based on
run status changes and execution events.
"""
def __init__(
self,
status: (
list[Literal["INACTIVE", "SUCCEEDED", "FAILED"]] | MissingType
) = MISSING
) -> None: ...Flows-specific error handling for workflow execution and management operations.
class FlowsAPIError(GlobusAPIError):
"""
Error class for Flows service API errors.
Provides enhanced error handling for flow-specific error
conditions including validation failures and execution errors.
"""from globus_sdk import FlowsClient
# Initialize client
flows_client = FlowsClient(authorizer=authorizer)
# Define a simple transfer flow
flow_definition = {
"Comment": "Simple transfer workflow",
"StartAt": "TransferData",
"States": {
"TransferData": {
"Comment": "Transfer files between endpoints",
"Type": "Action",
"ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
"Parameters": {
"source_endpoint_id": "$.source_endpoint",
"destination_endpoint_id": "$.dest_endpoint",
"transfer_items": [
{
"source_path": "$.source_path",
"destination_path": "$.dest_path",
"recursive": "$.recursive"
}
]
},
"ResultPath": "$.TransferResult",
"End": True
}
}
}
# Input schema for validation
input_schema = {
"type": "object",
"properties": {
"source_endpoint": {"type": "string"},
"dest_endpoint": {"type": "string"},
"source_path": {"type": "string"},
"dest_path": {"type": "string"},
"recursive": {"type": "boolean", "default": False}
},
"required": ["source_endpoint", "dest_endpoint", "source_path", "dest_path"]
}
# Create flow
create_response = flows_client.create_flow(
title="Simple Transfer Flow",
definition=flow_definition,
input_schema=input_schema,
description="Basic file transfer automation",
flow_starters=["all_authenticated_users"]
)
flow_id = create_response["id"]
# Run the flow
run_input = {
"source_endpoint": "ddb59aef-6d04-11e5-ba46-22000b92c6ec",
"dest_endpoint": "ddb59af0-6d04-11e5-ba46-22000b92c6ec",
"source_path": "/share/godata/file1.txt",
"dest_path": "/~/file1.txt",
"recursive": False
}
run_response = flows_client.run_flow(
flow_id,
body=run_input,
label="Transfer file1.txt"
)
run_id = run_response["action_id"]
print(f"Flow run started: {run_id}")# Define a complex workflow with conditional logic
complex_flow = {
"Comment": "Data processing pipeline",
"StartAt": "ValidateInput",
"States": {
"ValidateInput": {
"Type": "Action",
"ActionUrl": "https://actions.automate.globus.org/transfer/ls",
"Parameters": {
"endpoint_id": "$.source_endpoint",
"path": "$.input_path"
},
"ResultPath": "$.ValidationResult",
"Next": "CheckFileExists"
},
"CheckFileExists": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.ValidationResult.code",
"StringEquals": "success",
"Next": "ProcessData"
}
],
"Default": "NotifyFailure"
},
"ProcessData": {
"Type": "Action",
"ActionUrl": "https://compute.actions.globus.org/fxap",
"Parameters": {
"endpoint": "$.compute_endpoint",
"function": "$.processing_function",
"payload": {
"input_file": "$.input_path",
"output_file": "$.output_path"
}
},
"ResultPath": "$.ProcessResult",
"Next": "TransferResults"
},
"TransferResults": {
"Type": "Action",
"ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
"Parameters": {
"source_endpoint_id": "$.compute_endpoint",
"destination_endpoint_id": "$.output_endpoint",
"transfer_items": [{
"source_path": "$.output_path",
"destination_path": "$.final_path"
}]
},
"End": True
},
"NotifyFailure": {
"Type": "Action",
"ActionUrl": "https://actions.automate.globus.org/notification/notify",
"Parameters": {
"message": "Input validation failed",
"recipients": ["$.notification_email"]
},
"End": True
}
}
}
# Create with notification policy
notification_policy = RunActivityNotificationPolicy(
status=["SUCCEEDED", "FAILED"]
)
create_response = flows_client.create_flow(
title="Data Processing Pipeline",
definition=complex_flow,
input_schema={
"type": "object",
"properties": {
"source_endpoint": {"type": "string"},
"compute_endpoint": {"type": "string"},
"output_endpoint": {"type": "string"},
"input_path": {"type": "string"},
"output_path": {"type": "string"},
"final_path": {"type": "string"},
"processing_function": {"type": "string"},
"notification_email": {"type": "string"}
},
"required": ["source_endpoint", "input_path", "processing_function"]
}
)# Monitor flow execution
run_id = "run-uuid-here"
while True:
run_info = flows_client.get_run(run_id, include_flow_description=True)
status = run_info["status"]
print(f"Run status: {status}")
if status in ["SUCCEEDED", "FAILED", "INACTIVE"]:
break
time.sleep(10)
# Get detailed logs
logs = flows_client.get_run_logs(run_id, limit=100)
for log_entry in logs:
print(f"{log_entry['time']}: {log_entry['details']}")
# List all runs for a flow
runs = flows_client.list_runs(
filter_flow_id=flow_id,
filter_status=["ACTIVE", "SUCCEEDED"],
orderby="start_time DESC"
)
for run in runs:
print(f"Run {run['action_id']}: {run['status']} - {run['label']}")from globus_sdk import SpecificFlowClient
# Create flow-specific client
specific_client = SpecificFlowClient(
flow_id="flow-uuid-here",
app=app,
flow_scope="https://auth.globus.org/scopes/flow-uuid-here/flow_run"
)
# Simplified operations without specifying flow_id
run_response = specific_client.run_flow(
body=run_input,
label="Automated run"
)
flow_info = specific_client.get_flow()
print(f"Flow title: {flow_info['title']}")
# Update flow definition
specific_client.update_flow(
description="Updated description",
keywords=["updated", "automated"]
)Install with Tessl CLI
npx tessl i tessl/pypi-globus-sdk