or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdendpoint-management.mdindex.mdpod-management.mdserverless-worker.md

endpoint-management.mddocs/

0

# Endpoint Management

1

2

Management of serverless endpoints for deploying AI/ML models as scalable APIs, along with high-level client interfaces for interacting with deployed endpoints. Supports both synchronous and asynchronous job submission, real-time monitoring, and comprehensive job lifecycle management.

3

4

## Core Imports

5

6

```python

7

from runpod import Endpoint, AsyncioEndpoint

8

from runpod.http_client import ClientSession

9

from typing import Iterator, AsyncIterator

10

```

11

12

## Capabilities

13

14

### Endpoint Administration

15

16

Create and manage serverless endpoints that can scale automatically based on demand and run custom AI/ML workloads.

17

18

```python { .api }

19

def create_endpoint(

20

name: str,

21

template_id: str,

22

gpu_ids: str,

23

network_volume_id: str = None,

24

locations: str = None,

25

idle_timeout: int = 5,

26

scaler_type: str = "QUEUE_DELAY",

27

scaler_value: int = 4,

28

workers_min: int = 0,

29

workers_max: int = 3,

30

flashboot: bool = False

31

) -> dict:

32

"""

33

Create a new serverless endpoint.

34

35

Parameters:

36

- name: Endpoint display name

37

- template_id: Pod template ID to use for workers

38

- gpu_ids: Comma-separated GPU type IDs (e.g., "NVIDIA GeForce RTX 3070")

39

- network_volume_id: Network volume ID for shared storage

40

- locations: Comma-separated location preferences

41

- idle_timeout: Minutes before idle workers are terminated

42

- scaler_type: Scaling algorithm ("QUEUE_DELAY", "REQUEST_COUNT")

43

- scaler_value: Scaling threshold value

44

- workers_min: Minimum number of workers

45

- workers_max: Maximum number of workers

46

- flashboot: Enable fast cold start optimization

47

48

Returns:

49

dict: Created endpoint information with endpoint ID

50

"""

51

52

def get_endpoints() -> list:

53

"""

54

Get list of all user's endpoints.

55

56

Returns:

57

list: Endpoint information including status and configuration

58

"""

59

60

def update_endpoint_template(endpoint_id: str, template_id: str) -> dict:

61

"""

62

Update an endpoint's template configuration.

63

64

Parameters:

65

- endpoint_id: Endpoint ID to update

66

- template_id: New template ID to use

67

68

Returns:

69

dict: Update confirmation with new configuration

70

"""

71

```

72

73

### Synchronous Endpoint Client

74

75

High-level client for making synchronous requests to deployed endpoints with comprehensive job management capabilities.

76

77

```python { .api }

78

class Endpoint:

79

"""Synchronous endpoint client for making requests to RunPod endpoints."""

80

81

def __init__(self, endpoint_id: str):

82

"""

83

Initialize endpoint client.

84

85

Parameters:

86

- endpoint_id: The endpoint ID to connect to

87

"""

88

89

def run(self, request_input: dict) -> 'Job':

90

"""

91

Submit a job to the endpoint.

92

93

Parameters:

94

- request_input: Input data to send to the endpoint

95

96

Returns:

97

Job: Job instance for monitoring and retrieving results

98

"""

99

100

def run_sync(self, request_input: dict, timeout: int = 86400) -> dict:

101

"""

102

Submit job and wait for completion synchronously.

103

104

Parameters:

105

- request_input: Input data to send to the endpoint

106

- timeout: Maximum wait time in seconds (default: 86400)

107

108

Returns:

109

dict: Job output when completed

110

"""

111

112

def health(self, timeout: int = 3) -> dict:

113

"""

114

Check the health of the endpoint (number/state of workers, requests).

115

116

Parameters:

117

- timeout: Seconds to wait for server response (default: 3)

118

119

Returns:

120

dict: Endpoint health information including worker and request status

121

"""

122

123

def purge_queue(self, timeout: int = 3) -> dict:

124

"""

125

Purge the endpoint's job queue.

126

127

Parameters:

128

- timeout: Seconds to wait for server response (default: 3)

129

130

Returns:

131

dict: Purge operation result

132

"""

133

134

class Job:

135

"""Represents a job submitted to an endpoint."""

136

137

def __init__(self, endpoint_id: str, job_id: str):

138

"""

139

Initialize job instance.

140

141

Parameters:

142

- endpoint_id: Endpoint ID where job is running

143

- job_id: Unique job identifier

144

"""

145

146

def status(self) -> dict:

147

"""

148

Get current job status.

149

150

Returns:

151

dict: Job status information including state and progress

152

"""

153

154

def output(self, timeout: int = 0) -> dict:

155

"""

156

Get job output, optionally waiting for completion.

157

158

Parameters:

159

- timeout: Maximum wait time in seconds (0 for no timeout)

160

161

Returns:

162

dict: Job output data when available

163

"""

164

165

def stream(self) -> Iterator[dict]:

166

"""

167

Stream job output as it becomes available.

168

169

Returns:

170

Iterator[dict]: Generator yielding output chunks

171

"""

172

173

def cancel(self, timeout: int = 3) -> dict:

174

"""

175

Cancel the running job.

176

177

Parameters:

178

- timeout: Seconds to wait for server response (default: 3)

179

180

Returns:

181

dict: Cancellation confirmation

182

"""

183

```

184

185

### Asynchronous Endpoint Client

186

187

High-performance asynchronous client for concurrent job processing and improved throughput.

188

189

```python { .api }

190

class AsyncioEndpoint:

191

"""Asynchronous endpoint client for concurrent job processing."""

192

193

def __init__(self, endpoint_id: str, session: ClientSession):

194

"""

195

Initialize async endpoint client.

196

197

Parameters:

198

- endpoint_id: The endpoint ID to connect to

199

- session: HTTP client session for async requests

200

"""

201

202

async def run(self, endpoint_input: dict) -> 'AsyncioJob':

203

"""

204

Submit a job asynchronously.

205

206

Parameters:

207

- endpoint_input: Input data to send to the endpoint

208

209

Returns:

210

AsyncioJob: Async job instance for monitoring

211

"""

212

213

async def health(self) -> dict:

214

"""

215

Check the health of the endpoint asynchronously.

216

217

Returns:

218

dict: Endpoint health information

219

"""

220

221

async def purge_queue(self) -> dict:

222

"""

223

Purge the endpoint's job queue asynchronously.

224

225

Returns:

226

dict: Purge operation result

227

"""

228

229

class AsyncioJob:

230

"""Represents an asynchronous job submitted to an endpoint."""

231

232

def __init__(self, endpoint_id: str, job_id: str, session: ClientSession):

233

"""

234

Initialize async job instance.

235

236

Parameters:

237

- endpoint_id: Endpoint ID where job is running

238

- job_id: Unique job identifier

239

- session: HTTP client session for async requests

240

"""

241

242

async def status(self) -> dict:

243

"""

244

Get current job status asynchronously.

245

246

Returns:

247

dict: Job status information including state and progress

248

"""

249

250

async def output(self, timeout: int = 0) -> dict:

251

"""

252

Get job output asynchronously.

253

254

Parameters:

255

- timeout: Maximum wait time in seconds (0 for no timeout)

256

257

Returns:

258

dict: Job output data when available

259

"""

260

261

async def stream(self) -> AsyncIterator[dict]:

262

"""

263

Stream job output asynchronously.

264

265

Returns:

266

AsyncIterator[dict]: Async generator yielding output chunks

267

"""

268

269

async def cancel(self) -> dict:

270

"""

271

Cancel the running job asynchronously.

272

273

Returns:

274

dict: Cancellation confirmation

275

"""

276

```

277

278

## Usage Examples

279

280

### Creating and Managing Endpoints

281

282

```python

283

import runpod

284

285

# Set credentials

286

runpod.set_credentials("your-api-key")

287

288

# Create a new endpoint

289

endpoint_config = runpod.create_endpoint(

290

name="image-generation-endpoint",

291

template_id="your-template-id",

292

gpu_ids="NVIDIA GeForce RTX 3070,NVIDIA GeForce RTX 4080",

293

idle_timeout=3,

294

workers_min=0,

295

workers_max=5,

296

scaler_type="QUEUE_DELAY",

297

scaler_value=2,

298

flashboot=True

299

)

300

301

print(f"Created endpoint: {endpoint_config['id']}")

302

303

# List all endpoints

304

endpoints = runpod.get_endpoints()

305

for ep in endpoints:

306

print(f"Endpoint {ep['id']}: {ep['name']} - {ep['status']}")

307

```

308

309

### Synchronous Endpoint Usage

310

311

```python

312

import runpod

313

314

# Create endpoint client

315

endpoint = runpod.Endpoint("your-endpoint-id")

316

317

# Submit a job and get results synchronously

318

try:

319

result = endpoint.run_sync({

320

"prompt": "A beautiful sunset over mountains",

321

"steps": 50,

322

"width": 512,

323

"height": 512

324

}, timeout=300)

325

326

print("Generated image URL:", result["image_url"])

327

except Exception as e:

328

print(f"Job failed: {e}")

329

330

# Submit job for async processing

331

job = endpoint.run({

332

"prompt": "A futuristic cityscape",

333

"steps": 30

334

})

335

336

# Monitor job status

337

while True:

338

status = job.status()

339

print(f"Job status: {status['status']}")

340

341

if status["status"] in ["COMPLETED", "FAILED"]:

342

break

343

344

time.sleep(5)

345

346

# Get final results

347

if status["status"] == "COMPLETED":

348

output = job.output()

349

print("Results:", output)

350

```

351

352

### Asynchronous Endpoint Usage

353

354

```python

355

import asyncio

356

import runpod

357

from runpod.http_client import ClientSession

358

359

async def process_multiple_jobs():

360

session = ClientSession()

361

endpoint = runpod.AsyncioEndpoint("your-endpoint-id", session)

362

363

# Submit multiple jobs concurrently

364

jobs = []

365

prompts = [

366

"A cat in a hat",

367

"A dog in space",

368

"A robot playing piano"

369

]

370

371

for prompt in prompts:

372

job = await endpoint.run({"prompt": prompt, "steps": 20})

373

jobs.append(job)

374

375

# Wait for all jobs to complete

376

results = []

377

for job in jobs:

378

try:

379

output = await job.output(timeout=180)

380

results.append(output)

381

except Exception as e:

382

print(f"Job failed: {e}")

383

results.append(None)

384

385

return results

386

387

# Run async job processing

388

results = asyncio.run(process_multiple_jobs())

389

for i, result in enumerate(results):

390

if result:

391

print(f"Job {i+1} completed: {result}")

392

```

393

394

### Streaming Job Output

395

396

```python

397

import runpod

398

399

endpoint = runpod.Endpoint("your-endpoint-id")

400

401

# Submit job that produces streaming output

402

job = endpoint.run({

403

"prompt": "Generate a long story",

404

"stream": True

405

})

406

407

# Stream results as they arrive

408

print("Streaming output:")

409

for chunk in job.stream():

410

if "text" in chunk:

411

print(chunk["text"], end="", flush=True)

412

elif "status" in chunk:

413

print(f"\nStatus: {chunk['status']}")

414

415

print("\nStream completed")

416

```

417

418

### Job Management and Error Handling

419

420

```python

421

import runpod

422

import time

423

424

endpoint = runpod.Endpoint("your-endpoint-id")

425

426

# Submit job with webhook notification

427

job = endpoint.run(

428

{"prompt": "A complex 3D render", "steps": 100},

429

webhook="https://your-app.com/webhook/job-complete"

430

)

431

432

# Monitor with timeout and cancellation

433

start_time = time.time()

434

max_runtime = 600 # 10 minutes

435

436

try:

437

while True:

438

status = job.status()

439

elapsed = time.time() - start_time

440

441

print(f"Job {job.job_id}: {status['status']} (elapsed: {elapsed:.1f}s)")

442

443

if status["status"] in ["COMPLETED", "FAILED"]:

444

break

445

446

# Cancel if taking too long

447

if elapsed > max_runtime:

448

print("Job taking too long, cancelling...")

449

cancel_result = job.cancel()

450

print(f"Cancelled: {cancel_result}")

451

break

452

453

time.sleep(10)

454

455

# Get results if completed

456

if status["status"] == "COMPLETED":

457

output = job.output()

458

print("Job completed successfully:", output)

459

460

except Exception as e:

461

print(f"Error monitoring job: {e}")

462

# Try to cancel on error

463

try:

464

job.cancel()

465

except:

466

pass

467

```