0
# Compute Service
1
2
Function execution and management on Globus Compute endpoints with support for Python functions, containers, and distributed computing patterns. The Compute service enables high-performance distributed computing across federated resources with seamless function registration and execution.
3
4
## Capabilities
5
6
### Compute Clients
7
8
Core clients providing access to different versions of the Globus Compute API with comprehensive function and endpoint management capabilities.
9
10
```python { .api }
11
class ComputeClientV2(BaseClient):
12
"""
13
Client for Globus Compute API version 2.
14
15
Provides legacy compute operations including function registration,
16
endpoint management, and task execution with v2 API compatibility.
17
"""
18
19
def __init__(
20
self,
21
*,
22
app: GlobusApp | None = None,
23
authorizer: GlobusAuthorizer | None = None,
24
environment: str | None = None,
25
base_url: str | None = None,
26
**kwargs
27
) -> None: ...
28
29
def get_version(self, service: str | None = None) -> GlobusHTTPResponse:
30
"""
31
Get current API version and service information.
32
33
Parameters:
34
- service: Specific service to get version info for
35
36
Returns:
37
GlobusHTTPResponse with version details
38
"""
39
40
def get_result_amqp_url(self) -> GlobusHTTPResponse:
41
"""
42
Generate AMQP connection credentials for real-time result streaming.
43
44
Creates new credentials for connecting to the AMQP service
45
to receive task results and status updates in real-time.
46
47
Returns:
48
GlobusHTTPResponse with AMQP connection URL and credentials
49
"""
50
51
class ComputeClientV3(BaseClient):
52
"""
53
Client for Globus Compute API version 3.
54
55
Provides modern compute operations with enhanced endpoint management,
56
improved function registration, and advanced task execution capabilities.
57
"""
58
59
def __init__(
60
self,
61
*,
62
app: GlobusApp | None = None,
63
authorizer: GlobusAuthorizer | None = None,
64
environment: str | None = None,
65
base_url: str | None = None,
66
**kwargs
67
) -> None: ...
68
```
69
70
### Endpoint Management
71
72
Register, manage, and monitor compute endpoints for distributed function execution across federated computing resources.
73
74
```python { .api }
75
def register_endpoint(self, data: dict[str, Any]) -> GlobusHTTPResponse:
76
"""
77
Register a new compute endpoint.
78
79
Registers an endpoint that can execute functions submitted to the
80
Globus Compute service. Endpoints can be configured with specific
81
execution environments, resource limits, and access policies.
82
83
Parameters:
84
- data: Endpoint registration document with configuration
85
86
Returns:
87
GlobusHTTPResponse with endpoint registration details and UUID
88
"""
89
90
def get_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
91
"""
92
Get detailed information about a registered endpoint.
93
94
Parameters:
95
- endpoint_id: UUID of the endpoint
96
97
Returns:
98
GlobusHTTPResponse with endpoint configuration and status
99
"""
100
101
def update_endpoint(
102
self,
103
endpoint_id: str | UUID,
104
data: dict[str, Any]
105
) -> GlobusHTTPResponse:
106
"""
107
Update endpoint configuration.
108
109
Parameters:
110
- endpoint_id: UUID of endpoint to update
111
- data: Endpoint update document
112
113
Returns:
114
GlobusHTTPResponse confirming update
115
"""
116
117
def get_endpoint_status(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
118
"""
119
Get current status of a compute endpoint.
120
121
Returns information about endpoint availability, worker processes,
122
queue status, and resource utilization.
123
124
Parameters:
125
- endpoint_id: UUID of the endpoint
126
127
Returns:
128
GlobusHTTPResponse with endpoint status information
129
"""
130
131
def delete_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:
132
"""
133
Delete a registered endpoint.
134
135
Permanently removes an endpoint registration. Running tasks
136
will continue but no new tasks can be submitted.
137
138
Parameters:
139
- endpoint_id: UUID of endpoint to delete
140
141
Returns:
142
GlobusHTTPResponse confirming deletion
143
"""
144
```
145
146
### Function Management
147
148
Register, update, and manage Python functions for distributed execution with support for dependencies and access control.
149
150
```python { .api }
151
def register_function(self, function_data: dict[str, Any]) -> GlobusHTTPResponse:
152
"""
153
Register a Python function for distributed execution.
154
155
Registers a serialized Python function that can be executed on
156
compute endpoints. Functions can include dependencies, environment
157
requirements, and access control policies.
158
159
Parameters:
160
- function_data: Function registration document containing serialized code
161
162
Returns:
163
GlobusHTTPResponse with function UUID and registration details
164
"""
165
166
def get_function(self, function_id: str | UUID) -> GlobusHTTPResponse:
167
"""
168
Get information about a registered function.
169
170
Parameters:
171
- function_id: UUID of the function
172
173
Returns:
174
GlobusHTTPResponse with function metadata and access policies
175
"""
176
177
def delete_function(self, function_id: str | UUID) -> GlobusHTTPResponse:
178
"""
179
Delete a registered function.
180
181
Removes function registration. Running tasks using this function
182
will continue but new tasks cannot be submitted.
183
184
Parameters:
185
- function_id: UUID of function to delete
186
187
Returns:
188
GlobusHTTPResponse confirming deletion
189
"""
190
191
def submit_function(
192
self,
193
function_document: ComputeFunctionDocument
194
) -> GlobusHTTPResponse:
195
"""
196
Submit and register a function in a single operation.
197
198
Parameters:
199
- function_document: Complete function document with code and metadata
200
201
Returns:
202
GlobusHTTPResponse with function UUID
203
"""
204
```
205
206
### Task Execution and Management
207
208
Submit function execution tasks and monitor their progress with support for batch operations and result retrieval.
209
210
```python { .api }
211
def submit_task(
212
self,
213
endpoint_uuid: str | UUID,
214
function_uuid: str | UUID,
215
function_args: list | None = None,
216
function_kwargs: dict | None = None,
217
**kwargs
218
) -> GlobusHTTPResponse:
219
"""
220
Submit a task for execution on a compute endpoint.
221
222
Executes a registered function on the specified endpoint with
223
the provided arguments. Tasks are queued and executed asynchronously.
224
225
Parameters:
226
- endpoint_uuid: UUID of endpoint to execute on
227
- function_uuid: UUID of function to execute
228
- function_args: Positional arguments for function
229
- function_kwargs: Keyword arguments for function
230
- **kwargs: Additional task parameters
231
232
Returns:
233
GlobusHTTPResponse with task UUID for monitoring
234
"""
235
236
def submit(self, data: dict[str, Any]) -> GlobusHTTPResponse:
237
"""
238
Submit a batch of tasks for execution.
239
240
Submits multiple tasks in a single request for efficient
241
processing. Tasks can target different endpoints and functions.
242
243
Parameters:
244
- data: Task batch document containing task specifications
245
246
Returns:
247
GlobusHTTPResponse with task UUIDs and batch information
248
"""
249
250
def get_task(self, task_id: str | UUID) -> GlobusHTTPResponse:
251
"""
252
Get task status and results.
253
254
Retrieves current status, execution results, and any error
255
information for a submitted task.
256
257
Parameters:
258
- task_id: UUID of the task
259
260
Returns:
261
GlobusHTTPResponse with task status, results, and metadata
262
"""
263
264
def get_task_batch(
265
self,
266
task_ids: str | UUID | Iterable[str | UUID]
267
) -> GlobusHTTPResponse:
268
"""
269
Get status and results for multiple tasks.
270
271
Efficiently retrieves information for multiple tasks in a
272
single request, useful for monitoring batch operations.
273
274
Parameters:
275
- task_ids: Task UUID(s) to retrieve
276
277
Returns:
278
GlobusHTTPResponse with status and results for all requested tasks
279
"""
280
281
def get_task_group(self, task_group_id: str | UUID) -> GlobusHTTPResponse:
282
"""
283
Get all task IDs associated with a task group.
284
285
Retrieves the list of tasks that belong to a specific task group,
286
useful for managing related batch operations.
287
288
Parameters:
289
- task_group_id: UUID of the task group
290
291
Returns:
292
GlobusHTTPResponse with list of task UUIDs in the group
293
"""
294
```
295
296
### Function and Task Data Classes
297
298
Data containers for function registration and task submission with proper serialization and metadata handling.
299
300
```python { .api }
301
class ComputeFunctionDocument(PayloadWrapper):
302
"""
303
Function registration document for submitting Python functions.
304
305
Note: This class is deprecated in favor of direct dictionary usage
306
but remains available for backward compatibility.
307
308
Contains serialized function code, metadata, and access control
309
information required for function registration.
310
"""
311
312
def __init__(
313
self,
314
*,
315
function_name: str,
316
function_code: str,
317
description: str | MissingType = MISSING,
318
metadata: ComputeFunctionMetadata | MissingType = MISSING,
319
group: str | UUID | MissingType = MISSING,
320
public: bool = False
321
) -> None: ...
322
323
class ComputeFunctionMetadata(PayloadWrapper):
324
"""
325
Metadata container for function registration.
326
327
Note: This class is deprecated in favor of direct dictionary usage
328
but remains available for backward compatibility.
329
330
Contains version and environment information for function execution.
331
"""
332
333
def __init__(
334
self,
335
*,
336
python_version: str | MissingType = MISSING,
337
sdk_version: str | MissingType = MISSING
338
) -> None: ...
339
```
340
341
### Error Handling
342
343
Compute-specific error handling for function execution and endpoint management operations.
344
345
```python { .api }
346
class ComputeAPIError(GlobusAPIError):
347
"""
348
Error class for Compute service API errors.
349
350
Provides enhanced error handling for compute-specific error
351
conditions including function execution failures and endpoint issues.
352
"""
353
```
354
355
## Common Usage Patterns
356
357
### Basic Function Execution
358
359
```python
360
from globus_sdk import ComputeClientV3
361
362
# Initialize client
363
compute_client = ComputeClientV3(authorizer=authorizer)
364
365
# Register a simple function
366
def hello_world(name):
367
return f"Hello, {name}!"
368
369
function_data = {
370
"function_name": "hello_world",
371
"function_code": serialize_function(hello_world), # Use appropriate serialization
372
"description": "Simple greeting function",
373
"public": True
374
}
375
376
# Register function
377
func_response = compute_client.register_function(function_data)
378
function_uuid = func_response["function_uuid"]
379
380
# Submit task
381
task_response = compute_client.submit_task(
382
endpoint_uuid="endpoint-uuid-here",
383
function_uuid=function_uuid,
384
function_args=["World"]
385
)
386
task_uuid = task_response["task_uuid"]
387
388
# Monitor task
389
while True:
390
task_info = compute_client.get_task(task_uuid)
391
status = task_info["status"]
392
393
if status == "SUCCESS":
394
result = task_info["result"]
395
print(f"Task result: {result}")
396
break
397
elif status == "FAILED":
398
print(f"Task failed: {task_info.get('error')}")
399
break
400
401
time.sleep(1)
402
```
403
404
### Batch Task Processing
405
406
```python
407
# Submit multiple tasks at once
408
batch_data = {
409
"tasks": [
410
{
411
"endpoint": "endpoint-1",
412
"function": function_uuid,
413
"args": [f"User {i}"],
414
"kwargs": {}
415
}
416
for i in range(10)
417
]
418
}
419
420
batch_response = compute_client.submit(batch_data)
421
task_ids = batch_response["task_uuids"]
422
423
# Monitor batch progress
424
while True:
425
batch_status = compute_client.get_task_batch(task_ids)
426
427
completed = sum(1 for task in batch_status["results"]
428
if task["status"] in ["SUCCESS", "FAILED"])
429
430
print(f"Progress: {completed}/{len(task_ids)} tasks completed")
431
432
if completed == len(task_ids):
433
break
434
435
time.sleep(5)
436
437
# Process results
438
for task_info in batch_status["results"]:
439
if task_info["status"] == "SUCCESS":
440
print(f"Task {task_info['task_uuid']}: {task_info['result']}")
441
```
442
443
### Endpoint Management
444
445
```python
446
# Register a new endpoint
447
endpoint_config = {
448
"endpoint_name": "my-compute-endpoint",
449
"description": "Personal compute endpoint",
450
"public": False,
451
"allowed_functions": [function_uuid]
452
}
453
454
endpoint_response = compute_client.register_endpoint(endpoint_config)
455
endpoint_uuid = endpoint_response["endpoint_uuid"]
456
457
# Check endpoint status
458
status = compute_client.get_endpoint_status(endpoint_uuid)
459
print(f"Endpoint status: {status['status']}")
460
print(f"Active workers: {status.get('outstanding_tasks', 0)}")
461
462
# Update endpoint configuration
463
update_data = {
464
"description": "Updated description",
465
"public": True
466
}
467
compute_client.update_endpoint(endpoint_uuid, update_data)
468
```
469
470
### Function with Dependencies
471
472
```python
473
# Register function with complex dependencies
474
function_code = """
475
def process_data(data_list):
476
import numpy as np
477
import pandas as pd
478
479
# Process data using scientific libraries
480
arr = np.array(data_list)
481
df = pd.DataFrame({'values': arr})
482
return df.describe().to_dict()
483
"""
484
485
function_data = {
486
"function_name": "process_data",
487
"function_code": function_code,
488
"description": "Data processing with scientific libraries",
489
"container_uuid": "container-with-scipy", # Pre-configured container
490
"resource_requirements": {
491
"num_cores": 2,
492
"memory_per_core": "2GB"
493
}
494
}
495
496
func_response = compute_client.register_function(function_data)
497
function_uuid = func_response["function_uuid"]
498
499
# Submit data processing task
500
task_response = compute_client.submit_task(
501
endpoint_uuid=endpoint_uuid,
502
function_uuid=function_uuid,
503
function_args=[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
504
)
505
```
506
507
### Real-time Results with AMQP
508
509
```python
510
# Get AMQP connection for real-time results
511
amqp_response = compute_client.get_result_amqp_url()
512
amqp_url = amqp_response["amqp_url"]
513
514
# Connect to AMQP for real-time task updates
515
import pika
516
517
connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
518
channel = connection.channel()
519
520
def on_result(ch, method, properties, body):
521
result_data = json.loads(body)
522
task_id = result_data["task_id"]
523
status = result_data["status"]
524
525
if status == "SUCCESS":
526
print(f"Task {task_id} completed: {result_data['result']}")
527
elif status == "FAILED":
528
print(f"Task {task_id} failed: {result_data['error']}")
529
530
# Set up consumer for results
531
channel.basic_consume(
532
queue="task_results",
533
on_message_callback=on_result,
534
auto_ack=True
535
)
536
537
# Start consuming results
538
channel.start_consuming()
539
```