Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Function execution and management on Globus Compute endpoints with support for Python functions, containers, and distributed computing patterns. The Compute service enables high-performance distributed computing across federated resources with seamless function registration and execution.
Core clients providing access to different versions of the Globus Compute API with comprehensive function and endpoint management capabilities.
class ComputeClientV2(BaseClient):
"""
Client for Globus Compute API version 2.
Provides legacy compute operations including function registration,
endpoint management, and task execution with v2 API compatibility.
"""
def __init__(
self,
*,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
environment: str | None = None,
base_url: str | None = None,
**kwargs
) -> None: ...
def get_version(self, service: str | None = None) -> GlobusHTTPResponse:
"""
Get current API version and service information.
Parameters:
- service: Specific service to get version info for
Returns:
GlobusHTTPResponse with version details
"""
def get_result_amqp_url(self) -> GlobusHTTPResponse:
"""
Generate AMQP connection credentials for real-time result streaming.
Creates new credentials for connecting to the AMQP service
to receive task results and status updates in real-time.
Returns:
GlobusHTTPResponse with AMQP connection URL and credentials
"""
class ComputeClientV3(BaseClient):
"""
Client for Globus Compute API version 3.
Provides modern compute operations with enhanced endpoint management,
improved function registration, and advanced task execution capabilities.
"""
def __init__(
self,
*,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
environment: str | None = None,
base_url: str | None = None,
**kwargs
) -> None: ...Register, manage, and monitor compute endpoints for distributed function execution across federated computing resources.
def register_endpoint(self, data: dict[str, Any]) -> GlobusHTTPResponse:
"""
Register a new compute endpoint.
Registers an endpoint that can execute functions submitted to the
Globus Compute service. Endpoints can be configured with specific
execution environments, resource limits, and access policies.
Parameters:
- data: Endpoint registration document with configuration
Returns:
GlobusHTTPResponse with endpoint registration details and UUID
"""
def get_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
"""
Get detailed information about a registered endpoint.
Parameters:
- endpoint_id: UUID of the endpoint
Returns:
GlobusHTTPResponse with endpoint configuration and status
"""
def update_endpoint(
self,
endpoint_id: str | UUID,
data: dict[str, Any]
) -> GlobusHTTPResponse:
"""
Update endpoint configuration.
Parameters:
- endpoint_id: UUID of endpoint to update
- data: Endpoint update document
Returns:
GlobusHTTPResponse confirming update
"""
def get_endpoint_status(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
"""
Get current status of a compute endpoint.
Returns information about endpoint availability, worker processes,
queue status, and resource utilization.
Parameters:
- endpoint_id: UUID of the endpoint
Returns:
GlobusHTTPResponse with endpoint status information
"""
def delete_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
"""
Delete a registered endpoint.
Permanently removes an endpoint registration. Running tasks
will continue but no new tasks can be submitted.
Parameters:
- endpoint_id: UUID of endpoint to delete
Returns:
GlobusHTTPResponse confirming deletion
"""Register, update, and manage Python functions for distributed execution with support for dependencies and access control.
def register_function(self, function_data: dict[str, Any]) -> GlobusHTTPResponse:
"""
Register a Python function for distributed execution.
Registers a serialized Python function that can be executed on
compute endpoints. Functions can include dependencies, environment
requirements, and access control policies.
Parameters:
- function_data: Function registration document containing serialized code
Returns:
GlobusHTTPResponse with function UUID and registration details
"""
def get_function(self, function_id: str | UUID) -> GlobusHTTPResponse:
"""
Get information about a registered function.
Parameters:
- function_id: UUID of the function
Returns:
GlobusHTTPResponse with function metadata and access policies
"""
def delete_function(self, function_id: str | UUID) -> GlobusHTTPResponse:
"""
Delete a registered function.
Removes function registration. Running tasks using this function
will continue but new tasks cannot be submitted.
Parameters:
- function_id: UUID of function to delete
Returns:
GlobusHTTPResponse confirming deletion
"""
def submit_function(
self,
function_document: ComputeFunctionDocument
) -> GlobusHTTPResponse:
"""
Submit and register a function in a single operation.
Parameters:
- function_document: Complete function document with code and metadata
Returns:
GlobusHTTPResponse with function UUID
"""Submit function execution tasks and monitor their progress with support for batch operations and result retrieval.
def submit_task(
self,
endpoint_uuid: str | UUID,
function_uuid: str | UUID,
function_args: list | None = None,
function_kwargs: dict | None = None,
**kwargs
) -> GlobusHTTPResponse:
"""
Submit a task for execution on a compute endpoint.
Executes a registered function on the specified endpoint with
the provided arguments. Tasks are queued and executed asynchronously.
Parameters:
- endpoint_uuid: UUID of endpoint to execute on
- function_uuid: UUID of function to execute
- function_args: Positional arguments for function
- function_kwargs: Keyword arguments for function
- **kwargs: Additional task parameters
Returns:
GlobusHTTPResponse with task UUID for monitoring
"""
def submit(self, data: dict[str, Any]) -> GlobusHTTPResponse:
"""
Submit a batch of tasks for execution.
Submits multiple tasks in a single request for efficient
processing. Tasks can target different endpoints and functions.
Parameters:
- data: Task batch document containing task specifications
Returns:
GlobusHTTPResponse with task UUIDs and batch information
"""
def get_task(self, task_id: str | UUID) -> GlobusHTTPResponse:
"""
Get task status and results.
Retrieves current status, execution results, and any error
information for a submitted task.
Parameters:
- task_id: UUID of the task
Returns:
GlobusHTTPResponse with task status, results, and metadata
"""
def get_task_batch(
self,
task_ids: str | UUID | Iterable[str | UUID]
) -> GlobusHTTPResponse:
"""
Get status and results for multiple tasks.
Efficiently retrieves information for multiple tasks in a
single request, useful for monitoring batch operations.
Parameters:
- task_ids: Task UUID(s) to retrieve
Returns:
GlobusHTTPResponse with status and results for all requested tasks
"""
def get_task_group(self, task_group_id: str | UUID) -> GlobusHTTPResponse:
"""
Get all task IDs associated with a task group.
Retrieves the list of tasks that belong to a specific task group,
useful for managing related batch operations.
Parameters:
- task_group_id: UUID of the task group
Returns:
GlobusHTTPResponse with list of task UUIDs in the group
"""Data containers for function registration and task submission with proper serialization and metadata handling.
class ComputeFunctionDocument(PayloadWrapper):
"""
Function registration document for submitting Python functions.
Note: This class is deprecated in favor of direct dictionary usage
but remains available for backward compatibility.
Contains serialized function code, metadata, and access control
information required for function registration.
"""
def __init__(
self,
*,
function_name: str,
function_code: str,
description: str | MissingType = MISSING,
metadata: ComputeFunctionMetadata | MissingType = MISSING,
group: str | UUID | MissingType = MISSING,
public: bool = False
) -> None: ...
class ComputeFunctionMetadata(PayloadWrapper):
"""
Metadata container for function registration.
Note: This class is deprecated in favor of direct dictionary usage
but remains available for backward compatibility.
Contains version and environment information for function execution.
"""
def __init__(
self,
*,
python_version: str | MissingType = MISSING,
sdk_version: str | MissingType = MISSING
) -> None: ...Compute-specific error handling for function execution and endpoint management operations.
class ComputeAPIError(GlobusAPIError):
"""
Error class for Compute service API errors.
Provides enhanced error handling for compute-specific error
conditions including function execution failures and endpoint issues.
"""from globus_sdk import ComputeClientV3
# Initialize client
compute_client = ComputeClientV3(authorizer=authorizer)
# Register a simple function
def hello_world(name):
return f"Hello, {name}!"
function_data = {
"function_name": "hello_world",
"function_code": serialize_function(hello_world), # Use appropriate serialization
"description": "Simple greeting function",
"public": True
}
# Register function
func_response = compute_client.register_function(function_data)
function_uuid = func_response["function_uuid"]
# Submit task
task_response = compute_client.submit_task(
endpoint_uuid="endpoint-uuid-here",
function_uuid=function_uuid,
function_args=["World"]
)
task_uuid = task_response["task_uuid"]
# Monitor task
while True:
task_info = compute_client.get_task(task_uuid)
status = task_info["status"]
if status == "SUCCESS":
result = task_info["result"]
print(f"Task result: {result}")
break
elif status == "FAILED":
print(f"Task failed: {task_info.get('error')}")
break
time.sleep(1)# Submit multiple tasks at once
batch_data = {
"tasks": [
{
"endpoint": "endpoint-1",
"function": function_uuid,
"args": [f"User {i}"],
"kwargs": {}
}
for i in range(10)
]
}
batch_response = compute_client.submit(batch_data)
task_ids = batch_response["task_uuids"]
# Monitor batch progress
while True:
batch_status = compute_client.get_task_batch(task_ids)
completed = sum(1 for task in batch_status["results"]
if task["status"] in ["SUCCESS", "FAILED"])
print(f"Progress: {completed}/{len(task_ids)} tasks completed")
if completed == len(task_ids):
break
time.sleep(5)
# Process results
for task_info in batch_status["results"]:
if task_info["status"] == "SUCCESS":
print(f"Task {task_info['task_uuid']}: {task_info['result']}")# Register a new endpoint
endpoint_config = {
"endpoint_name": "my-compute-endpoint",
"description": "Personal compute endpoint",
"public": False,
"allowed_functions": [function_uuid]
}
endpoint_response = compute_client.register_endpoint(endpoint_config)
endpoint_uuid = endpoint_response["endpoint_uuid"]
# Check endpoint status
status = compute_client.get_endpoint_status(endpoint_uuid)
print(f"Endpoint status: {status['status']}")
print(f"Active workers: {status.get('outstanding_tasks', 0)}")
# Update endpoint configuration
update_data = {
"description": "Updated description",
"public": True
}
compute_client.update_endpoint(endpoint_uuid, update_data)# Register function with complex dependencies
function_code = """
def process_data(data_list):
import numpy as np
import pandas as pd
# Process data using scientific libraries
arr = np.array(data_list)
df = pd.DataFrame({'values': arr})
return df.describe().to_dict()
"""
function_data = {
"function_name": "process_data",
"function_code": function_code,
"description": "Data processing with scientific libraries",
"container_uuid": "container-with-scipy", # Pre-configured container
"resource_requirements": {
"num_cores": 2,
"memory_per_core": "2GB"
}
}
func_response = compute_client.register_function(function_data)
function_uuid = func_response["function_uuid"]
# Submit data processing task
task_response = compute_client.submit_task(
endpoint_uuid=endpoint_uuid,
function_uuid=function_uuid,
function_args=[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
)# Get AMQP connection for real-time results
amqp_response = compute_client.get_result_amqp_url()
amqp_url = amqp_response["amqp_url"]
# Connect to AMQP for real-time task updates
import pika
connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
channel = connection.channel()
def on_result(ch, method, properties, body):
result_data = json.loads(body)
task_id = result_data["task_id"]
status = result_data["status"]
if status == "SUCCESS":
print(f"Task {task_id} completed: {result_data['result']}")
elif status == "FAILED":
print(f"Task {task_id} failed: {result_data['error']}")
# Set up consumer for results
channel.basic_consume(
queue="task_results",
on_message_callback=on_result,
auto_ack=True
)
# Start consuming results
channel.start_consuming()Install with Tessl CLI
npx tessl i tessl/pypi-globus-sdk