CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-globus-sdk

Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

flows-service.mddocs/

Flows Service

Workflow automation and orchestration for complex multi-step operations across Globus services with conditional logic, error handling, and state management. The Flows service enables creation and execution of sophisticated automation workflows that can coordinate data transfers, compute jobs, and other service operations.

Capabilities

Flows Client

Core client for managing flow definitions, executing workflows, and monitoring flow runs with comprehensive filtering and access control.

class FlowsClient(BaseClient):
    """
    Client for Globus Flows service operations.
    
    Provides methods for flow lifecycle management including creation, execution,
    monitoring, and administration with comprehensive access control and
    notification capabilities.
    """
    
    def __init__(
        self,
        *,
        app: GlobusApp | None = None,
        authorizer: GlobusAuthorizer | None = None,
        environment: str | None = None,
        base_url: str | None = None,
        **kwargs
    ) -> None: ...

Flow Definition and Management

Create, update, and manage workflow definitions with JSON-based state machines and comprehensive access control policies.

def create_flow(
    self,
    title: str,
    definition: dict[str, Any],
    input_schema: dict[str, Any],
    *,
    subtitle: str | None = None,
    description: str | None = None,
    flow_viewers: list[str] | None = None,
    flow_starters: list[str] | None = None,
    flow_administrators: list[str] | None = None,
    run_managers: list[str] | None = None,
    run_monitors: list[str] | None = None,
    keywords: list[str] | None = None,
    subscription_id: str | UUID | None = None,
    additional_fields: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Create a new flow definition.
    
    Creates a workflow with states, transitions, and execution logic
    defined using Amazon States Language (ASL) syntax with Globus
    service integrations.
    
    Parameters:
    - title: Human-readable flow name (1-128 characters)
    - definition: JSON state machine definition (ASL format)
    - input_schema: JSON Schema for validating flow input
    - subtitle: Concise flow summary (0-128 characters)
    - description: Detailed flow description (0-4096 characters)
    - flow_viewers: Principal URNs who can view the flow
    - flow_starters: Principal URNs who can run the flow
    - flow_administrators: Principal URNs who can manage the flow
    - run_managers: Principal URNs who can manage flow runs
    - run_monitors: Principal URNs who can monitor flow runs
    - keywords: Searchable tags for flow discovery
    - subscription_id: Associated Globus subscription
    - additional_fields: Additional metadata fields
    
    Returns:
    GlobusHTTPResponse with flow ID and creation details
    """

def get_flow(
    self,
    flow_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Retrieve flow definition and metadata.
    
    Parameters:
    - flow_id: UUID of the flow to retrieve
    - query_params: Additional query parameters
    
    Returns:
    GlobusHTTPResponse with complete flow definition and metadata
    """

def update_flow(
    self,
    flow_id: str | UUID,
    *,
    title: str | None = None,
    definition: dict[str, Any] | None = None,
    input_schema: dict[str, Any] | None = None,
    subtitle: str | None = None,
    description: str | None = None,
    flow_owner: str | None = None,
    flow_viewers: list[str] | None = None,
    flow_starters: list[str] | None = None,
    flow_administrators: list[str] | None = None,
    run_managers: list[str] | None = None,
    run_monitors: list[str] | None = None,
    keywords: list[str] | None = None,
    subscription_id: str | UUID | None = None,
    additional_fields: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Update an existing flow definition.
    
    Updates flow metadata, definition, or access policies.
    Only specified fields will be modified.
    
    Parameters:
    - flow_id: UUID of flow to update
    - title: Updated flow title
    - definition: Updated state machine definition
    - input_schema: Updated input validation schema
    - Other parameters: See create_flow for descriptions
    
    Returns:
    GlobusHTTPResponse confirming update
    """

def delete_flow(
    self,
    flow_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Delete a flow definition.
    
    Permanently removes a flow. Running instances will continue
    but no new runs can be started.
    
    Parameters:
    - flow_id: UUID of flow to delete
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming deletion
    """

def validate_flow(
    self,
    definition: dict[str, Any],
    input_schema: dict[str, Any] | MissingType = MISSING
) -> GlobusHTTPResponse:
    """
    Validate flow definition and schema.
    
    Checks flow definition syntax, state transitions, and
    input schema validity without creating the flow.
    
    Parameters:
    - definition: Flow state machine definition to validate
    - input_schema: Input schema to validate
    
    Returns:
    GlobusHTTPResponse with validation results and any errors
    """

def list_flows(
    self,
    *,
    filter_role: str | None = None,
    filter_roles: str | Iterable[str] | None = None,
    filter_fulltext: str | None = None,
    orderby: str | Iterable[str] | None = None,
    marker: str | None = None,
    query_params: dict[str, Any] | None = None
) -> IterableFlowsResponse:
    """
    List accessible flows with filtering and pagination.
    
    Parameters:
    - filter_role: Deprecated - minimum role required for inclusion
    - filter_roles: List of roles for filtering (flow_viewer, flow_starter, etc.)
    - filter_fulltext: Full-text search across flow metadata
    - orderby: Sort criteria (e.g., "updated_at DESC", "title ASC")
    - marker: Pagination marker
    - query_params: Additional query parameters
    
    Returns:
    IterableFlowsResponse with paginated flow listings
    """

Flow Execution and Run Management

Start flow runs, monitor execution, and manage running workflow instances with comprehensive status tracking.

def run_flow(
    self,
    body: dict[str, Any],
    *,
    label: str | None = None,
    tags: list[str] | None = None,
    activity_notification_policy: (
        dict[str, Any] | RunActivityNotificationPolicy | None
    ) = None,
    run_monitors: list[str] | None = None,
    run_managers: list[str] | None = None,
    additional_fields: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Start execution of a flow with input data.
    
    Creates a new run instance of the flow with the provided
    input data, which is validated against the flow's input schema.
    
    Parameters:
    - body: Input data for the flow (validated against input_schema)
    - label: Human-readable run title (1-64 characters)
    - tags: Searchable tags for the run
    - activity_notification_policy: Email notification configuration
    - run_monitors: Principal URNs authorized to view this run
    - run_managers: Principal URNs authorized to manage this run
    - additional_fields: Additional run metadata
    
    Returns:
    GlobusHTTPResponse with run ID and execution details
    """

def get_run(
    self,
    run_id: str | UUID,
    *,
    include_flow_description: bool | None = None,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Get detailed information about a flow run.
    
    Parameters:
    - run_id: UUID of the run to retrieve
    - include_flow_description: Include flow metadata in response
    - query_params: Additional query parameters
    
    Returns:
    GlobusHTTPResponse with run status, results, and execution details
    """

def get_run_definition(
    self,
    run_id: str | UUID
) -> GlobusHTTPResponse:
    """
    Get flow definition and input schema used for a specific run.
    
    Returns the exact flow definition and input schema that were
    active when the run was started, useful for reproducibility.
    
    Parameters:
    - run_id: UUID of the run
    
    Returns:
    GlobusHTTPResponse with flow definition and input schema
    """

def cancel_run(
    self,
    run_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Cancel a running or pending flow execution.
    
    Attempts to gracefully stop flow execution. Running states
    may complete but no new states will be started.
    
    Parameters:
    - run_id: UUID of run to cancel
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming cancellation request
    """

def release_run(
    self,
    run_id: str | UUID,
    *,
    query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
    """
    Release a completed run from monitoring.
    
    Marks run as released, reducing storage usage and removing
    it from active monitoring. Run logs remain accessible.
    
    Parameters:
    - run_id: UUID of completed run to release
    - query_params: Additional parameters
    
    Returns:
    GlobusHTTPResponse confirming release
    """

def list_runs(
    self,
    *,
    filter_flow_id: str | UUID | None = None,
    filter_role: str | None = None,
    filter_roles: str | Iterable[str] | None = None,
    filter_status: str | Iterable[str] | None = None,
    filter_label: str | None = None,
    filter_tags: str | Iterable[str] | None = None,
    orderby: str | Iterable[str] | None = None,
    marker: str | None = None,
    query_params: dict[str, Any] | None = None
) -> IterableRunsResponse:
    """
    List flow runs with comprehensive filtering options.
    
    Parameters:
    - filter_flow_id: Only runs of specified flow
    - filter_role: Deprecated - minimum role required
    - filter_roles: Required roles for access
    - filter_status: Run statuses to include (ACTIVE, SUCCEEDED, FAILED, etc.)
    - filter_label: Filter by run label
    - filter_tags: Filter by run tags
    - orderby: Sort criteria
    - marker: Pagination marker
    - query_params: Additional parameters
    
    Returns:
    IterableRunsResponse with paginated run listings
    """

def get_run_logs(
    self,
    run_id: str | UUID,
    *,
    limit: int | None = None,
    reverse_order: bool | None = None,
    marker: str | None = None,
    query_params: dict[str, Any] | None = None
) -> IterableRunLogsResponse:
    """
    Get execution logs for a flow run.
    
    Returns detailed execution logs including state transitions,
    action results, and error information for debugging.
    
    Parameters:
    - run_id: UUID of the run
    - limit: Maximum log entries to return
    - reverse_order: Return logs in reverse chronological order
    - marker: Pagination marker
    - query_params: Additional parameters
    
    Returns:
    IterableRunLogsResponse with paginated log entries
    """

Specific Flow Client

Specialized client for managing individual flows with scoped permissions and streamlined operations.

class SpecificFlowClient(FlowsClient):
    """
    Client scoped to operations on a specific flow.
    
    Provides streamlined access to flow operations without
    repeatedly specifying the flow ID, with automatic scope
    management for flow-specific permissions.
    """
    
    def __init__(
        self,
        flow_id: str | UUID,
        *,
        app: GlobusApp | None = None,
        authorizer: GlobusAuthorizer | None = None,
        flow_scope: str | None = None,
        **kwargs
    ) -> None: ...
    
    def run_flow(
        self,
        body: dict[str, Any],
        *,
        label: str | None = None,
        tags: list[str] | None = None,
        **kwargs
    ) -> GlobusHTTPResponse:
        """Run the specific flow with input data."""
    
    def get_flow(self, **kwargs) -> GlobusHTTPResponse:
        """Get the specific flow definition."""
    
    def update_flow(self, **kwargs) -> GlobusHTTPResponse:
        """Update the specific flow."""
    
    def delete_flow(self, **kwargs) -> GlobusHTTPResponse:
        """Delete the specific flow."""

Response Objects and Data Classes

Specialized response classes and data containers for flow operations with enhanced iteration and notification support.

class IterableFlowsResponse(IterableResponse):
    """Response class for flow listings with pagination support."""
    
    def __iter__(self) -> Iterator[dict[str, Any]]:
        """Iterate over flow definitions."""

class IterableRunsResponse(IterableResponse):
    """Response class for flow run listings with pagination support."""
    
    def __iter__(self) -> Iterator[dict[str, Any]]:
        """Iterate over run records."""

class IterableRunLogsResponse(IterableResponse):
    """Response class for run log entries with pagination support."""
    
    def __iter__(self) -> Iterator[dict[str, Any]]:
        """Iterate over log entries."""

class RunActivityNotificationPolicy(PayloadWrapper):
    """
    Notification policy configuration for flow runs.
    
    Defines when email notifications will be sent based on
    run status changes and execution events.
    """
    
    def __init__(
        self,
        status: (
            list[Literal["INACTIVE", "SUCCEEDED", "FAILED"]] | MissingType
        ) = MISSING
    ) -> None: ...

Error Handling

Flows-specific error handling for workflow execution and management operations.

class FlowsAPIError(GlobusAPIError):
    """
    Error class for Flows service API errors.
    
    Provides enhanced error handling for flow-specific error
    conditions including validation failures and execution errors.
    """

Common Usage Patterns

Basic Flow Creation and Execution

from globus_sdk import FlowsClient

# Initialize client
flows_client = FlowsClient(authorizer=authorizer)

# Define a simple transfer flow
flow_definition = {
    "Comment": "Simple transfer workflow",
    "StartAt": "TransferData",
    "States": {
        "TransferData": {
            "Comment": "Transfer files between endpoints",
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
            "Parameters": {
                "source_endpoint_id": "$.source_endpoint",
                "destination_endpoint_id": "$.dest_endpoint", 
                "transfer_items": [
                    {
                        "source_path": "$.source_path",
                        "destination_path": "$.dest_path",
                        "recursive": "$.recursive"
                    }
                ]
            },
            "ResultPath": "$.TransferResult",
            "End": True
        }
    }
}

# Input schema for validation
input_schema = {
    "type": "object",
    "properties": {
        "source_endpoint": {"type": "string"},
        "dest_endpoint": {"type": "string"},
        "source_path": {"type": "string"},
        "dest_path": {"type": "string"},
        "recursive": {"type": "boolean", "default": False}
    },
    "required": ["source_endpoint", "dest_endpoint", "source_path", "dest_path"]
}

# Create flow
create_response = flows_client.create_flow(
    title="Simple Transfer Flow",
    definition=flow_definition,
    input_schema=input_schema,
    description="Basic file transfer automation",
    flow_starters=["all_authenticated_users"]
)
flow_id = create_response["id"]

# Run the flow
run_input = {
    "source_endpoint": "ddb59aef-6d04-11e5-ba46-22000b92c6ec",
    "dest_endpoint": "ddb59af0-6d04-11e5-ba46-22000b92c6ec",
    "source_path": "/share/godata/file1.txt",
    "dest_path": "/~/file1.txt",
    "recursive": False
}

run_response = flows_client.run_flow(
    flow_id,
    body=run_input,
    label="Transfer file1.txt"
)
run_id = run_response["action_id"]

print(f"Flow run started: {run_id}")

Complex Multi-Step Workflow

# Define a complex workflow with conditional logic
complex_flow = {
    "Comment": "Data processing pipeline",
    "StartAt": "ValidateInput",
    "States": {
        "ValidateInput": {
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/ls",
            "Parameters": {
                "endpoint_id": "$.source_endpoint",
                "path": "$.input_path"
            },
            "ResultPath": "$.ValidationResult",
            "Next": "CheckFileExists"
        },
        "CheckFileExists": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.ValidationResult.code",
                    "StringEquals": "success",
                    "Next": "ProcessData"
                }
            ],
            "Default": "NotifyFailure"
        },
        "ProcessData": {
            "Type": "Action", 
            "ActionUrl": "https://compute.actions.globus.org/fxap",
            "Parameters": {
                "endpoint": "$.compute_endpoint",
                "function": "$.processing_function",
                "payload": {
                    "input_file": "$.input_path",
                    "output_file": "$.output_path"
                }
            },
            "ResultPath": "$.ProcessResult",
            "Next": "TransferResults"
        },
        "TransferResults": {
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer", 
            "Parameters": {
                "source_endpoint_id": "$.compute_endpoint",
                "destination_endpoint_id": "$.output_endpoint",
                "transfer_items": [{
                    "source_path": "$.output_path",
                    "destination_path": "$.final_path"
                }]
            },
            "End": True
        },
        "NotifyFailure": {
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/notification/notify",
            "Parameters": {
                "message": "Input validation failed",
                "recipients": ["$.notification_email"]
            },
            "End": True
        }
    }
}

# Create with notification policy
notification_policy = RunActivityNotificationPolicy(
    status=["SUCCEEDED", "FAILED"]
)

create_response = flows_client.create_flow(
    title="Data Processing Pipeline",
    definition=complex_flow,
    input_schema={
        "type": "object",
        "properties": {
            "source_endpoint": {"type": "string"},
            "compute_endpoint": {"type": "string"},
            "output_endpoint": {"type": "string"},
            "input_path": {"type": "string"},
            "output_path": {"type": "string"},
            "final_path": {"type": "string"},
            "processing_function": {"type": "string"},
            "notification_email": {"type": "string"}
        },
        "required": ["source_endpoint", "input_path", "processing_function"]
    }
)

Flow Run Monitoring and Management

# Monitor flow execution
run_id = "run-uuid-here"

while True:
    run_info = flows_client.get_run(run_id, include_flow_description=True)
    status = run_info["status"]
    
    print(f"Run status: {status}")
    
    if status in ["SUCCEEDED", "FAILED", "INACTIVE"]:
        break
    
    time.sleep(10)

# Get detailed logs
logs = flows_client.get_run_logs(run_id, limit=100)
for log_entry in logs:
    print(f"{log_entry['time']}: {log_entry['details']}")

# List all runs for a flow
runs = flows_client.list_runs(
    filter_flow_id=flow_id,
    filter_status=["ACTIVE", "SUCCEEDED"],
    orderby="start_time DESC"
)

for run in runs:
    print(f"Run {run['action_id']}: {run['status']} - {run['label']}")

Using Specific Flow Client

from globus_sdk import SpecificFlowClient

# Create flow-specific client
specific_client = SpecificFlowClient(
    flow_id="flow-uuid-here",
    app=app,
    flow_scope="https://auth.globus.org/scopes/flow-uuid-here/flow_run"
)

# Simplified operations without specifying flow_id
run_response = specific_client.run_flow(
    body=run_input,
    label="Automated run"
)

flow_info = specific_client.get_flow()
print(f"Flow title: {flow_info['title']}")

# Update flow definition
specific_client.update_flow(
    description="Updated description",
    keywords=["updated", "automated"]
)

Install with Tessl CLI

npx tessl i tessl/pypi-globus-sdk

docs

auth-service.md

compute-service.md

core-framework.md

flows-service.md

gcs-service.md

groups-service.md

index.md

search-service.md

timers-service.md

transfer-service.md

tile.json