CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-globus-sdk

Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

compute-service.mddocs/

Compute Service

Function execution and management on Globus Compute endpoints with support for Python functions, containers, and distributed computing patterns. The Compute service enables high-performance distributed computing across federated resources with seamless function registration and execution.

Capabilities

Compute Clients

Core clients providing access to different versions of the Globus Compute API with comprehensive function and endpoint management capabilities.

class ComputeClientV2(BaseClient):
    """
    Client for Globus Compute API version 2.
    
    Provides legacy compute operations including function registration,
    endpoint management, and task execution with v2 API compatibility.
    """
    
    def __init__(
        self,
        *,
        app: GlobusApp | None = None,
        authorizer: GlobusAuthorizer | None = None,
        environment: str | None = None,
        base_url: str | None = None,
        **kwargs
    ) -> None: ...
    
    def get_version(self, service: str | None = None) -> GlobusHTTPResponse:
        """
        Get current API version and service information.
        
        Parameters:
        - service: Specific service to get version info for
        
        Returns:
        GlobusHTTPResponse with version details
        """
    
    def get_result_amqp_url(self) -> GlobusHTTPResponse:
        """
        Generate AMQP connection credentials for real-time result streaming.
        
        Creates new credentials for connecting to the AMQP service
        to receive task results and status updates in real-time.
        
        Returns:
        GlobusHTTPResponse with AMQP connection URL and credentials
        """

class ComputeClientV3(BaseClient):
    """
    Client for Globus Compute API version 3.
    
    Provides modern compute operations with enhanced endpoint management,
    improved function registration, and advanced task execution capabilities.
    """
    
    def __init__(
        self,
        *,
        app: GlobusApp | None = None,
        authorizer: GlobusAuthorizer | None = None,
        environment: str | None = None,
        base_url: str | None = None,
        **kwargs
    ) -> None: ...

Endpoint Management

Register, manage, and monitor compute endpoints for distributed function execution across federated computing resources.

def register_endpoint(self, data: dict[str, Any]) -> GlobusHTTPResponse:
    """
    Register a new compute endpoint.
    
    Registers an endpoint that can execute functions submitted to the
    Globus Compute service. Endpoints can be configured with specific
    execution environments, resource limits, and access policies.
    
    Parameters:
    - data: Endpoint registration document with configuration
    
    Returns:
    GlobusHTTPResponse with endpoint registration details and UUID
    """

def get_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
    """
    Get detailed information about a registered endpoint.
    
    Parameters:
    - endpoint_id: UUID of the endpoint
    
    Returns:
    GlobusHTTPResponse with endpoint configuration and status
    """

def update_endpoint(
    self, 
    endpoint_id: str | UUID, 
    data: dict[str, Any]
) -> GlobusHTTPResponse:
    """
    Update endpoint configuration.
    
    Parameters:
    - endpoint_id: UUID of endpoint to update
    - data: Endpoint update document
    
    Returns:
    GlobusHTTPResponse confirming update
    """

def get_endpoint_status(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
    """
    Get current status of a compute endpoint.
    
    Returns information about endpoint availability, worker processes,
    queue status, and resource utilization.
    
    Parameters:
    - endpoint_id: UUID of the endpoint
    
    Returns:
    GlobusHTTPResponse with endpoint status information
    """

def delete_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
    """
    Delete a registered endpoint.
    
    Permanently removes an endpoint registration. Running tasks
    will continue but no new tasks can be submitted.
    
    Parameters:
    - endpoint_id: UUID of endpoint to delete
    
    Returns:
    GlobusHTTPResponse confirming deletion
    """

Function Management

Register, update, and manage Python functions for distributed execution with support for dependencies and access control.

def register_function(self, function_data: dict[str, Any]) -> GlobusHTTPResponse:
    """
    Register a Python function for distributed execution.
    
    Registers a serialized Python function that can be executed on
    compute endpoints. Functions can include dependencies, environment
    requirements, and access control policies.
    
    Parameters:
    - function_data: Function registration document containing serialized code
    
    Returns:
    GlobusHTTPResponse with function UUID and registration details
    """

def get_function(self, function_id: str | UUID) -> GlobusHTTPResponse:
    """
    Get information about a registered function.
    
    Parameters:
    - function_id: UUID of the function
    
    Returns:
    GlobusHTTPResponse with function metadata and access policies
    """

def delete_function(self, function_id: str | UUID) -> GlobusHTTPResponse:
    """
    Delete a registered function.
    
    Removes function registration. Running tasks using this function
    will continue but new tasks cannot be submitted.
    
    Parameters:
    - function_id: UUID of function to delete
    
    Returns:
    GlobusHTTPResponse confirming deletion
    """

def submit_function(
    self, 
    function_document: ComputeFunctionDocument
) -> GlobusHTTPResponse:
    """
    Submit and register a function in a single operation.
    
    Parameters:
    - function_document: Complete function document with code and metadata
    
    Returns:
    GlobusHTTPResponse with function UUID
    """

Task Execution and Management

Submit function execution tasks and monitor their progress with support for batch operations and result retrieval.

def submit_task(
    self,
    endpoint_uuid: str | UUID,
    function_uuid: str | UUID,
    function_args: list | None = None,
    function_kwargs: dict | None = None,
    **kwargs
) -> GlobusHTTPResponse:
    """
    Submit a task for execution on a compute endpoint.
    
    Executes a registered function on the specified endpoint with
    the provided arguments. Tasks are queued and executed asynchronously.
    
    Parameters:
    - endpoint_uuid: UUID of endpoint to execute on
    - function_uuid: UUID of function to execute
    - function_args: Positional arguments for function
    - function_kwargs: Keyword arguments for function
    - **kwargs: Additional task parameters
    
    Returns:
    GlobusHTTPResponse with task UUID for monitoring
    """

def submit(self, data: dict[str, Any]) -> GlobusHTTPResponse:
    """
    Submit a batch of tasks for execution.
    
    Submits multiple tasks in a single request for efficient
    processing. Tasks can target different endpoints and functions.
    
    Parameters:
    - data: Task batch document containing task specifications
    
    Returns:
    GlobusHTTPResponse with task UUIDs and batch information
    """

def get_task(self, task_id: str | UUID) -> GlobusHTTPResponse:
    """
    Get task status and results.
    
    Retrieves current status, execution results, and any error
    information for a submitted task.
    
    Parameters:
    - task_id: UUID of the task
    
    Returns:
    GlobusHTTPResponse with task status, results, and metadata
    """

def get_task_batch(
    self, 
    task_ids: str | UUID | Iterable[str | UUID]
) -> GlobusHTTPResponse:
    """
    Get status and results for multiple tasks.
    
    Efficiently retrieves information for multiple tasks in a
    single request, useful for monitoring batch operations.
    
    Parameters:
    - task_ids: Task UUID(s) to retrieve
    
    Returns:
    GlobusHTTPResponse with status and results for all requested tasks
    """

def get_task_group(self, task_group_id: str | UUID) -> GlobusHTTPResponse:
    """
    Get all task IDs associated with a task group.
    
    Retrieves the list of tasks that belong to a specific task group,
    useful for managing related batch operations.
    
    Parameters:
    - task_group_id: UUID of the task group
    
    Returns:
    GlobusHTTPResponse with list of task UUIDs in the group
    """

Function and Task Data Classes

Data containers for function registration and task submission with proper serialization and metadata handling.

class ComputeFunctionDocument(PayloadWrapper):
    """
    Function registration document for submitting Python functions.
    
    Note: This class is deprecated in favor of direct dictionary usage
    but remains available for backward compatibility.
    
    Contains serialized function code, metadata, and access control
    information required for function registration.
    """
    
    def __init__(
        self,
        *,
        function_name: str,
        function_code: str,
        description: str | MissingType = MISSING,
        metadata: ComputeFunctionMetadata | MissingType = MISSING,
        group: str | UUID | MissingType = MISSING,
        public: bool = False
    ) -> None: ...

class ComputeFunctionMetadata(PayloadWrapper):
    """
    Metadata container for function registration.
    
    Note: This class is deprecated in favor of direct dictionary usage
    but remains available for backward compatibility.
    
    Contains version and environment information for function execution.
    """
    
    def __init__(
        self,
        *,
        python_version: str | MissingType = MISSING,
        sdk_version: str | MissingType = MISSING
    ) -> None: ...

Error Handling

Compute-specific error handling for function execution and endpoint management operations.

class ComputeAPIError(GlobusAPIError):
    """
    Error class for Compute service API errors.
    
    Provides enhanced error handling for compute-specific error
    conditions including function execution failures and endpoint issues.
    """

Common Usage Patterns

Basic Function Execution

from globus_sdk import ComputeClientV3

# Initialize client
compute_client = ComputeClientV3(authorizer=authorizer)

# Register a simple function
def hello_world(name):
    return f"Hello, {name}!"

function_data = {
    "function_name": "hello_world",
    "function_code": serialize_function(hello_world),  # Use appropriate serialization
    "description": "Simple greeting function",
    "public": True
}

# Register function
func_response = compute_client.register_function(function_data)
function_uuid = func_response["function_uuid"]

# Submit task
task_response = compute_client.submit_task(
    endpoint_uuid="endpoint-uuid-here",
    function_uuid=function_uuid,
    function_args=["World"]
)
task_uuid = task_response["task_uuid"]

# Monitor task
while True:
    task_info = compute_client.get_task(task_uuid)
    status = task_info["status"]
    
    if status == "SUCCESS":
        result = task_info["result"]
        print(f"Task result: {result}")
        break
    elif status == "FAILED":
        print(f"Task failed: {task_info.get('error')}")
        break
    
    time.sleep(1)

Batch Task Processing

# Submit multiple tasks at once
batch_data = {
    "tasks": [
        {
            "endpoint": "endpoint-1",
            "function": function_uuid,
            "args": [f"User {i}"],
            "kwargs": {}
        }
        for i in range(10)
    ]
}

batch_response = compute_client.submit(batch_data)
task_ids = batch_response["task_uuids"]

# Monitor batch progress
while True:
    batch_status = compute_client.get_task_batch(task_ids)
    
    completed = sum(1 for task in batch_status["results"] 
                   if task["status"] in ["SUCCESS", "FAILED"])
    
    print(f"Progress: {completed}/{len(task_ids)} tasks completed")
    
    if completed == len(task_ids):
        break
    
    time.sleep(5)

# Process results
for task_info in batch_status["results"]:
    if task_info["status"] == "SUCCESS":
        print(f"Task {task_info['task_uuid']}: {task_info['result']}")

Endpoint Management

# Register a new endpoint
endpoint_config = {
    "endpoint_name": "my-compute-endpoint",
    "description": "Personal compute endpoint",
    "public": False,
    "allowed_functions": [function_uuid]
}

endpoint_response = compute_client.register_endpoint(endpoint_config)
endpoint_uuid = endpoint_response["endpoint_uuid"]

# Check endpoint status
status = compute_client.get_endpoint_status(endpoint_uuid)
print(f"Endpoint status: {status['status']}")
print(f"Active workers: {status.get('outstanding_tasks', 0)}")

# Update endpoint configuration
update_data = {
    "description": "Updated description",
    "public": True
}
compute_client.update_endpoint(endpoint_uuid, update_data)

Function with Dependencies

# Register function with complex dependencies
function_code = """
def process_data(data_list):
    import numpy as np
    import pandas as pd
    
    # Process data using scientific libraries
    arr = np.array(data_list)
    df = pd.DataFrame({'values': arr})
    return df.describe().to_dict()
"""

function_data = {
    "function_name": "process_data",
    "function_code": function_code,
    "description": "Data processing with scientific libraries",
    "container_uuid": "container-with-scipy",  # Pre-configured container
    "resource_requirements": {
        "num_cores": 2,
        "memory_per_core": "2GB"
    }
}

func_response = compute_client.register_function(function_data)
function_uuid = func_response["function_uuid"]

# Submit data processing task
task_response = compute_client.submit_task(
    endpoint_uuid=endpoint_uuid,
    function_uuid=function_uuid,
    function_args=[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
)

Real-time Results with AMQP

# Get AMQP connection for real-time results
amqp_response = compute_client.get_result_amqp_url()
amqp_url = amqp_response["amqp_url"]

# Connect to AMQP for real-time task updates
import pika

connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
channel = connection.channel()

def on_result(ch, method, properties, body):
    result_data = json.loads(body)
    task_id = result_data["task_id"]
    status = result_data["status"]
    
    if status == "SUCCESS":
        print(f"Task {task_id} completed: {result_data['result']}")
    elif status == "FAILED":
        print(f"Task {task_id} failed: {result_data['error']}")

# Set up consumer for results
channel.basic_consume(
    queue="task_results",
    on_message_callback=on_result,
    auto_ack=True
)

# Start consuming results
channel.start_consuming()

Install with Tessl CLI

npx tessl i tessl/pypi-globus-sdk

docs

auth-service.md

compute-service.md

core-framework.md

flows-service.md

gcs-service.md

groups-service.md

index.md

search-service.md

timers-service.md

transfer-service.md

tile.json