or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth-service.mdcompute-service.mdcore-framework.mdflows-service.mdgcs-service.mdgroups-service.mdindex.mdsearch-service.mdtimers-service.mdtransfer-service.md

compute-service.mddocs/

0

# Compute Service

1

2

Function execution and management on Globus Compute endpoints with support for Python functions, containers, and distributed computing patterns. The Compute service enables high-performance distributed computing across federated resources with seamless function registration and execution.

3

4

## Capabilities

5

6

### Compute Clients

7

8

Core clients providing access to different versions of the Globus Compute API with comprehensive function and endpoint management capabilities.

9

10

```python { .api }

11

class ComputeClientV2(BaseClient):

12

"""

13

Client for Globus Compute API version 2.

14

15

Provides legacy compute operations including function registration,

16

endpoint management, and task execution with v2 API compatibility.

17

"""

18

19

def __init__(

20

self,

21

*,

22

app: GlobusApp | None = None,

23

authorizer: GlobusAuthorizer | None = None,

24

environment: str | None = None,

25

base_url: str | None = None,

26

**kwargs

27

) -> None: ...

28

29

def get_version(self, service: str | None = None) -> GlobusHTTPResponse:

30

"""

31

Get current API version and service information.

32

33

Parameters:

34

- service: Specific service to get version info for

35

36

Returns:

37

GlobusHTTPResponse with version details

38

"""

39

40

def get_result_amqp_url(self) -> GlobusHTTPResponse:

41

"""

42

Generate AMQP connection credentials for real-time result streaming.

43

44

Creates new credentials for connecting to the AMQP service

45

to receive task results and status updates in real-time.

46

47

Returns:

48

GlobusHTTPResponse with AMQP connection URL and credentials

49

"""

50

51

class ComputeClientV3(BaseClient):

52

"""

53

Client for Globus Compute API version 3.

54

55

Provides modern compute operations with enhanced endpoint management,

56

improved function registration, and advanced task execution capabilities.

57

"""

58

59

def __init__(

60

self,

61

*,

62

app: GlobusApp | None = None,

63

authorizer: GlobusAuthorizer | None = None,

64

environment: str | None = None,

65

base_url: str | None = None,

66

**kwargs

67

) -> None: ...

68

```

69

70

### Endpoint Management

71

72

Register, manage, and monitor compute endpoints for distributed function execution across federated computing resources.

73

74

```python { .api }

75

def register_endpoint(self, data: dict[str, Any]) -> GlobusHTTPResponse:

76

"""

77

Register a new compute endpoint.

78

79

Registers an endpoint that can execute functions submitted to the

80

Globus Compute service. Endpoints can be configured with specific

81

execution environments, resource limits, and access policies.

82

83

Parameters:

84

- data: Endpoint registration document with configuration

85

86

Returns:

87

GlobusHTTPResponse with endpoint registration details and UUID

88

"""

89

90

def get_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:

91

"""

92

Get detailed information about a registered endpoint.

93

94

Parameters:

95

- endpoint_id: UUID of the endpoint

96

97

Returns:

98

GlobusHTTPResponse with endpoint configuration and status

99

"""

100

101

def update_endpoint(

102

self,

103

endpoint_id: str | UUID,

104

data: dict[str, Any]

105

) -> GlobusHTTPResponse:

106

"""

107

Update endpoint configuration.

108

109

Parameters:

110

- endpoint_id: UUID of endpoint to update

111

- data: Endpoint update document

112

113

Returns:

114

GlobusHTTPResponse confirming update

115

"""

116

117

def get_endpoint_status(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:

118

"""

119

Get current status of a compute endpoint.

120

121

Returns information about endpoint availability, worker processes,

122

queue status, and resource utilization.

123

124

Parameters:

125

- endpoint_id: UUID of the endpoint

126

127

Returns:

128

GlobusHTTPResponse with endpoint status information

129

"""

130

131

def delete_endpoint(self, endpoint_id: str | UUID) -> GlobusHTTPResponse:

132

"""

133

Delete a registered endpoint.

134

135

Permanently removes an endpoint registration. Running tasks

136

will continue but no new tasks can be submitted.

137

138

Parameters:

139

- endpoint_id: UUID of endpoint to delete

140

141

Returns:

142

GlobusHTTPResponse confirming deletion

143

"""

144

```

145

146

### Function Management

147

148

Register, update, and manage Python functions for distributed execution with support for dependencies and access control.

149

150

```python { .api }

151

def register_function(self, function_data: dict[str, Any]) -> GlobusHTTPResponse:

152

"""

153

Register a Python function for distributed execution.

154

155

Registers a serialized Python function that can be executed on

156

compute endpoints. Functions can include dependencies, environment

157

requirements, and access control policies.

158

159

Parameters:

160

- function_data: Function registration document containing serialized code

161

162

Returns:

163

GlobusHTTPResponse with function UUID and registration details

164

"""

165

166

def get_function(self, function_id: str | UUID) -> GlobusHTTPResponse:

167

"""

168

Get information about a registered function.

169

170

Parameters:

171

- function_id: UUID of the function

172

173

Returns:

174

GlobusHTTPResponse with function metadata and access policies

175

"""

176

177

def delete_function(self, function_id: str | UUID) -> GlobusHTTPResponse:

178

"""

179

Delete a registered function.

180

181

Removes function registration. Running tasks using this function

182

will continue but new tasks cannot be submitted.

183

184

Parameters:

185

- function_id: UUID of function to delete

186

187

Returns:

188

GlobusHTTPResponse confirming deletion

189

"""

190

191

def submit_function(

192

self,

193

function_document: ComputeFunctionDocument

194

) -> GlobusHTTPResponse:

195

"""

196

Submit and register a function in a single operation.

197

198

Parameters:

199

- function_document: Complete function document with code and metadata

200

201

Returns:

202

GlobusHTTPResponse with function UUID

203

"""

204

```

205

206

### Task Execution and Management

207

208

Submit function execution tasks and monitor their progress with support for batch operations and result retrieval.

209

210

```python { .api }

211

def submit_task(

212

self,

213

endpoint_uuid: str | UUID,

214

function_uuid: str | UUID,

215

function_args: list | None = None,

216

function_kwargs: dict | None = None,

217

**kwargs

218

) -> GlobusHTTPResponse:

219

"""

220

Submit a task for execution on a compute endpoint.

221

222

Executes a registered function on the specified endpoint with

223

the provided arguments. Tasks are queued and executed asynchronously.

224

225

Parameters:

226

- endpoint_uuid: UUID of endpoint to execute on

227

- function_uuid: UUID of function to execute

228

- function_args: Positional arguments for function

229

- function_kwargs: Keyword arguments for function

230

- **kwargs: Additional task parameters

231

232

Returns:

233

GlobusHTTPResponse with task UUID for monitoring

234

"""

235

236

def submit(self, data: dict[str, Any]) -> GlobusHTTPResponse:

237

"""

238

Submit a batch of tasks for execution.

239

240

Submits multiple tasks in a single request for efficient

241

processing. Tasks can target different endpoints and functions.

242

243

Parameters:

244

- data: Task batch document containing task specifications

245

246

Returns:

247

GlobusHTTPResponse with task UUIDs and batch information

248

"""

249

250

def get_task(self, task_id: str | UUID) -> GlobusHTTPResponse:

251

"""

252

Get task status and results.

253

254

Retrieves current status, execution results, and any error

255

information for a submitted task.

256

257

Parameters:

258

- task_id: UUID of the task

259

260

Returns:

261

GlobusHTTPResponse with task status, results, and metadata

262

"""

263

264

def get_task_batch(

265

self,

266

task_ids: str | UUID | Iterable[str | UUID]

267

) -> GlobusHTTPResponse:

268

"""

269

Get status and results for multiple tasks.

270

271

Efficiently retrieves information for multiple tasks in a

272

single request, useful for monitoring batch operations.

273

274

Parameters:

275

- task_ids: Task UUID(s) to retrieve

276

277

Returns:

278

GlobusHTTPResponse with status and results for all requested tasks

279

"""

280

281

def get_task_group(self, task_group_id: str | UUID) -> GlobusHTTPResponse:

282

"""

283

Get all task IDs associated with a task group.

284

285

Retrieves the list of tasks that belong to a specific task group,

286

useful for managing related batch operations.

287

288

Parameters:

289

- task_group_id: UUID of the task group

290

291

Returns:

292

GlobusHTTPResponse with list of task UUIDs in the group

293

"""

294

```

295

296

### Function and Task Data Classes

297

298

Data containers for function registration and task submission with proper serialization and metadata handling.

299

300

```python { .api }

301

class ComputeFunctionDocument(PayloadWrapper):

302

"""

303

Function registration document for submitting Python functions.

304

305

Note: This class is deprecated in favor of direct dictionary usage

306

but remains available for backward compatibility.

307

308

Contains serialized function code, metadata, and access control

309

information required for function registration.

310

"""

311

312

def __init__(

313

self,

314

*,

315

function_name: str,

316

function_code: str,

317

description: str | MissingType = MISSING,

318

metadata: ComputeFunctionMetadata | MissingType = MISSING,

319

group: str | UUID | MissingType = MISSING,

320

public: bool = False

321

) -> None: ...

322

323

class ComputeFunctionMetadata(PayloadWrapper):

324

"""

325

Metadata container for function registration.

326

327

Note: This class is deprecated in favor of direct dictionary usage

328

but remains available for backward compatibility.

329

330

Contains version and environment information for function execution.

331

"""

332

333

def __init__(

334

self,

335

*,

336

python_version: str | MissingType = MISSING,

337

sdk_version: str | MissingType = MISSING

338

) -> None: ...

339

```

340

341

### Error Handling

342

343

Compute-specific error handling for function execution and endpoint management operations.

344

345

```python { .api }

346

class ComputeAPIError(GlobusAPIError):

347

"""

348

Error class for Compute service API errors.

349

350

Provides enhanced error handling for compute-specific error

351

conditions including function execution failures and endpoint issues.

352

"""

353

```

354

355

## Common Usage Patterns

356

357

### Basic Function Execution

358

359

```python

360

from globus_sdk import ComputeClientV3

361

362

# Initialize client

363

compute_client = ComputeClientV3(authorizer=authorizer)

364

365

# Register a simple function

366

def hello_world(name):

367

return f"Hello, {name}!"

368

369

function_data = {

370

"function_name": "hello_world",

371

"function_code": serialize_function(hello_world), # Use appropriate serialization

372

"description": "Simple greeting function",

373

"public": True

374

}

375

376

# Register function

377

func_response = compute_client.register_function(function_data)

378

function_uuid = func_response["function_uuid"]

379

380

# Submit task

381

task_response = compute_client.submit_task(

382

endpoint_uuid="endpoint-uuid-here",

383

function_uuid=function_uuid,

384

function_args=["World"]

385

)

386

task_uuid = task_response["task_uuid"]

387

388

# Monitor task

389

while True:

390

task_info = compute_client.get_task(task_uuid)

391

status = task_info["status"]

392

393

if status == "SUCCESS":

394

result = task_info["result"]

395

print(f"Task result: {result}")

396

break

397

elif status == "FAILED":

398

print(f"Task failed: {task_info.get('error')}")

399

break

400

401

time.sleep(1)

402

```

403

404

### Batch Task Processing

405

406

```python

407

# Submit multiple tasks at once

408

batch_data = {

409

"tasks": [

410

{

411

"endpoint": "endpoint-1",

412

"function": function_uuid,

413

"args": [f"User {i}"],

414

"kwargs": {}

415

}

416

for i in range(10)

417

]

418

}

419

420

batch_response = compute_client.submit(batch_data)

421

task_ids = batch_response["task_uuids"]

422

423

# Monitor batch progress

424

while True:

425

batch_status = compute_client.get_task_batch(task_ids)

426

427

completed = sum(1 for task in batch_status["results"]

428

if task["status"] in ["SUCCESS", "FAILED"])

429

430

print(f"Progress: {completed}/{len(task_ids)} tasks completed")

431

432

if completed == len(task_ids):

433

break

434

435

time.sleep(5)

436

437

# Process results

438

for task_info in batch_status["results"]:

439

if task_info["status"] == "SUCCESS":

440

print(f"Task {task_info['task_uuid']}: {task_info['result']}")

441

```

442

443

### Endpoint Management

444

445

```python

446

# Register a new endpoint

447

endpoint_config = {

448

"endpoint_name": "my-compute-endpoint",

449

"description": "Personal compute endpoint",

450

"public": False,

451

"allowed_functions": [function_uuid]

452

}

453

454

endpoint_response = compute_client.register_endpoint(endpoint_config)

455

endpoint_uuid = endpoint_response["endpoint_uuid"]

456

457

# Check endpoint status

458

status = compute_client.get_endpoint_status(endpoint_uuid)

459

print(f"Endpoint status: {status['status']}")

460

print(f"Active workers: {status.get('outstanding_tasks', 0)}")

461

462

# Update endpoint configuration

463

update_data = {

464

"description": "Updated description",

465

"public": True

466

}

467

compute_client.update_endpoint(endpoint_uuid, update_data)

468

```

469

470

### Function with Dependencies

471

472

```python

473

# Register function with complex dependencies

474

function_code = """

475

def process_data(data_list):

476

import numpy as np

477

import pandas as pd

478

479

# Process data using scientific libraries

480

arr = np.array(data_list)

481

df = pd.DataFrame({'values': arr})

482

return df.describe().to_dict()

483

"""

484

485

function_data = {

486

"function_name": "process_data",

487

"function_code": function_code,

488

"description": "Data processing with scientific libraries",

489

"container_uuid": "container-with-scipy", # Pre-configured container

490

"resource_requirements": {

491

"num_cores": 2,

492

"memory_per_core": "2GB"

493

}

494

}

495

496

func_response = compute_client.register_function(function_data)

497

function_uuid = func_response["function_uuid"]

498

499

# Submit data processing task

500

task_response = compute_client.submit_task(

501

endpoint_uuid=endpoint_uuid,

502

function_uuid=function_uuid,

503

function_args=[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]

504

)

505

```

506

507

### Real-time Results with AMQP

508

509

```python

510

# Get AMQP connection for real-time results

511

amqp_response = compute_client.get_result_amqp_url()

512

amqp_url = amqp_response["amqp_url"]

513

514

# Connect to AMQP for real-time task updates

515

import pika

516

517

connection = pika.BlockingConnection(pika.URLParameters(amqp_url))

518

channel = connection.channel()

519

520

def on_result(ch, method, properties, body):

521

result_data = json.loads(body)

522

task_id = result_data["task_id"]

523

status = result_data["status"]

524

525

if status == "SUCCESS":

526

print(f"Task {task_id} completed: {result_data['result']}")

527

elif status == "FAILED":

528

print(f"Task {task_id} failed: {result_data['error']}")

529

530

# Set up consumer for results

531

channel.basic_consume(

532

queue="task_results",

533

on_message_callback=on_result,

534

auto_ack=True

535

)

536

537

# Start consuming results

538

channel.start_consuming()

539

```