0
# Endpoint Management
1
2
Management of serverless endpoints for deploying AI/ML models as scalable APIs, along with high-level client interfaces for interacting with deployed endpoints. Supports both synchronous and asynchronous job submission, real-time monitoring, and comprehensive job lifecycle management.
3
4
## Core Imports
5
6
```python
7
from runpod import Endpoint, AsyncioEndpoint
8
from runpod.http_client import ClientSession
9
from typing import Iterator, AsyncIterator
10
```
11
12
## Capabilities
13
14
### Endpoint Administration
15
16
Create and manage serverless endpoints that can scale automatically based on demand and run custom AI/ML workloads.
17
18
```python { .api }
19
def create_endpoint(
20
name: str,
21
template_id: str,
22
gpu_ids: str,
23
network_volume_id: str = None,
24
locations: str = None,
25
idle_timeout: int = 5,
26
scaler_type: str = "QUEUE_DELAY",
27
scaler_value: int = 4,
28
workers_min: int = 0,
29
workers_max: int = 3,
30
flashboot: bool = False
31
) -> dict:
32
"""
33
Create a new serverless endpoint.
34
35
Parameters:
36
- name: Endpoint display name
37
- template_id: Pod template ID to use for workers
38
- gpu_ids: Comma-separated GPU type IDs (e.g., "NVIDIA GeForce RTX 3070")
39
- network_volume_id: Network volume ID for shared storage
40
- locations: Comma-separated location preferences
41
- idle_timeout: Minutes before idle workers are terminated
42
- scaler_type: Scaling algorithm ("QUEUE_DELAY", "REQUEST_COUNT")
43
- scaler_value: Scaling threshold value
44
- workers_min: Minimum number of workers
45
- workers_max: Maximum number of workers
46
- flashboot: Enable fast cold start optimization
47
48
Returns:
49
dict: Created endpoint information with endpoint ID
50
"""
51
52
def get_endpoints() -> list:
53
"""
54
Get list of all user's endpoints.
55
56
Returns:
57
list: Endpoint information including status and configuration
58
"""
59
60
def update_endpoint_template(endpoint_id: str, template_id: str) -> dict:
61
"""
62
Update an endpoint's template configuration.
63
64
Parameters:
65
- endpoint_id: Endpoint ID to update
66
- template_id: New template ID to use
67
68
Returns:
69
dict: Update confirmation with new configuration
70
"""
71
```
72
73
### Synchronous Endpoint Client
74
75
High-level client for making synchronous requests to deployed endpoints with comprehensive job management capabilities.
76
77
```python { .api }
78
class Endpoint:
79
"""Synchronous endpoint client for making requests to RunPod endpoints."""
80
81
def __init__(self, endpoint_id: str):
82
"""
83
Initialize endpoint client.
84
85
Parameters:
86
- endpoint_id: The endpoint ID to connect to
87
"""
88
89
def run(self, request_input: dict) -> 'Job':
90
"""
91
Submit a job to the endpoint.
92
93
Parameters:
94
- request_input: Input data to send to the endpoint
95
96
Returns:
97
Job: Job instance for monitoring and retrieving results
98
"""
99
100
def run_sync(self, request_input: dict, timeout: int = 86400) -> dict:
101
"""
102
Submit job and wait for completion synchronously.
103
104
Parameters:
105
- request_input: Input data to send to the endpoint
106
- timeout: Maximum wait time in seconds (default: 86400)
107
108
Returns:
109
dict: Job output when completed
110
"""
111
112
def health(self, timeout: int = 3) -> dict:
113
"""
114
Check the health of the endpoint (number/state of workers, requests).
115
116
Parameters:
117
- timeout: Seconds to wait for server response (default: 3)
118
119
Returns:
120
dict: Endpoint health information including worker and request status
121
"""
122
123
def purge_queue(self, timeout: int = 3) -> dict:
124
"""
125
Purge the endpoint's job queue.
126
127
Parameters:
128
- timeout: Seconds to wait for server response (default: 3)
129
130
Returns:
131
dict: Purge operation result
132
"""
133
134
class Job:
135
"""Represents a job submitted to an endpoint."""
136
137
def __init__(self, endpoint_id: str, job_id: str):
138
"""
139
Initialize job instance.
140
141
Parameters:
142
- endpoint_id: Endpoint ID where job is running
143
- job_id: Unique job identifier
144
"""
145
146
def status(self) -> dict:
147
"""
148
Get current job status.
149
150
Returns:
151
dict: Job status information including state and progress
152
"""
153
154
def output(self, timeout: int = 0) -> dict:
155
"""
156
Get job output, optionally waiting for completion.
157
158
Parameters:
159
- timeout: Maximum wait time in seconds (0 for no timeout)
160
161
Returns:
162
dict: Job output data when available
163
"""
164
165
def stream(self) -> Iterator[dict]:
166
"""
167
Stream job output as it becomes available.
168
169
Returns:
170
Iterator[dict]: Generator yielding output chunks
171
"""
172
173
def cancel(self, timeout: int = 3) -> dict:
174
"""
175
Cancel the running job.
176
177
Parameters:
178
- timeout: Seconds to wait for server response (default: 3)
179
180
Returns:
181
dict: Cancellation confirmation
182
"""
183
```
184
185
### Asynchronous Endpoint Client
186
187
High-performance asynchronous client for concurrent job processing and improved throughput.
188
189
```python { .api }
190
class AsyncioEndpoint:
191
"""Asynchronous endpoint client for concurrent job processing."""
192
193
def __init__(self, endpoint_id: str, session: ClientSession):
194
"""
195
Initialize async endpoint client.
196
197
Parameters:
198
- endpoint_id: The endpoint ID to connect to
199
- session: HTTP client session for async requests
200
"""
201
202
async def run(self, endpoint_input: dict) -> 'AsyncioJob':
203
"""
204
Submit a job asynchronously.
205
206
Parameters:
207
- endpoint_input: Input data to send to the endpoint
208
209
Returns:
210
AsyncioJob: Async job instance for monitoring
211
"""
212
213
async def health(self) -> dict:
214
"""
215
Check the health of the endpoint asynchronously.
216
217
Returns:
218
dict: Endpoint health information
219
"""
220
221
async def purge_queue(self) -> dict:
222
"""
223
Purge the endpoint's job queue asynchronously.
224
225
Returns:
226
dict: Purge operation result
227
"""
228
229
class AsyncioJob:
230
"""Represents an asynchronous job submitted to an endpoint."""
231
232
def __init__(self, endpoint_id: str, job_id: str, session: ClientSession):
233
"""
234
Initialize async job instance.
235
236
Parameters:
237
- endpoint_id: Endpoint ID where job is running
238
- job_id: Unique job identifier
239
- session: HTTP client session for async requests
240
"""
241
242
async def status(self) -> dict:
243
"""
244
Get current job status asynchronously.
245
246
Returns:
247
dict: Job status information including state and progress
248
"""
249
250
async def output(self, timeout: int = 0) -> dict:
251
"""
252
Get job output asynchronously.
253
254
Parameters:
255
- timeout: Maximum wait time in seconds (0 for no timeout)
256
257
Returns:
258
dict: Job output data when available
259
"""
260
261
async def stream(self) -> AsyncIterator[dict]:
262
"""
263
Stream job output asynchronously.
264
265
Returns:
266
AsyncIterator[dict]: Async generator yielding output chunks
267
"""
268
269
async def cancel(self) -> dict:
270
"""
271
Cancel the running job asynchronously.
272
273
Returns:
274
dict: Cancellation confirmation
275
"""
276
```
277
278
## Usage Examples
279
280
### Creating and Managing Endpoints
281
282
```python
283
import runpod
284
285
# Set credentials
286
runpod.set_credentials("your-api-key")
287
288
# Create a new endpoint
289
endpoint_config = runpod.create_endpoint(
290
name="image-generation-endpoint",
291
template_id="your-template-id",
292
gpu_ids="NVIDIA GeForce RTX 3070,NVIDIA GeForce RTX 4080",
293
idle_timeout=3,
294
workers_min=0,
295
workers_max=5,
296
scaler_type="QUEUE_DELAY",
297
scaler_value=2,
298
flashboot=True
299
)
300
301
print(f"Created endpoint: {endpoint_config['id']}")
302
303
# List all endpoints
304
endpoints = runpod.get_endpoints()
305
for ep in endpoints:
306
print(f"Endpoint {ep['id']}: {ep['name']} - {ep['status']}")
307
```
308
309
### Synchronous Endpoint Usage
310
311
```python
312
import runpod
313
314
# Create endpoint client
315
endpoint = runpod.Endpoint("your-endpoint-id")
316
317
# Submit a job and get results synchronously
318
try:
319
result = endpoint.run_sync({
320
"prompt": "A beautiful sunset over mountains",
321
"steps": 50,
322
"width": 512,
323
"height": 512
324
}, timeout=300)
325
326
print("Generated image URL:", result["image_url"])
327
except Exception as e:
328
print(f"Job failed: {e}")
329
330
# Submit job for async processing
331
job = endpoint.run({
332
"prompt": "A futuristic cityscape",
333
"steps": 30
334
})
335
336
# Monitor job status
337
while True:
338
status = job.status()
339
print(f"Job status: {status['status']}")
340
341
if status["status"] in ["COMPLETED", "FAILED"]:
342
break
343
344
time.sleep(5)
345
346
# Get final results
347
if status["status"] == "COMPLETED":
348
output = job.output()
349
print("Results:", output)
350
```
351
352
### Asynchronous Endpoint Usage
353
354
```python
355
import asyncio
356
import runpod
357
from runpod.http_client import ClientSession
358
359
async def process_multiple_jobs():
360
session = ClientSession()
361
endpoint = runpod.AsyncioEndpoint("your-endpoint-id", session)
362
363
# Submit multiple jobs concurrently
364
jobs = []
365
prompts = [
366
"A cat in a hat",
367
"A dog in space",
368
"A robot playing piano"
369
]
370
371
for prompt in prompts:
372
job = await endpoint.run({"prompt": prompt, "steps": 20})
373
jobs.append(job)
374
375
# Wait for all jobs to complete
376
results = []
377
for job in jobs:
378
try:
379
output = await job.output(timeout=180)
380
results.append(output)
381
except Exception as e:
382
print(f"Job failed: {e}")
383
results.append(None)
384
385
return results
386
387
# Run async job processing
388
results = asyncio.run(process_multiple_jobs())
389
for i, result in enumerate(results):
390
if result:
391
print(f"Job {i+1} completed: {result}")
392
```
393
394
### Streaming Job Output
395
396
```python
397
import runpod
398
399
endpoint = runpod.Endpoint("your-endpoint-id")
400
401
# Submit job that produces streaming output
402
job = endpoint.run({
403
"prompt": "Generate a long story",
404
"stream": True
405
})
406
407
# Stream results as they arrive
408
print("Streaming output:")
409
for chunk in job.stream():
410
if "text" in chunk:
411
print(chunk["text"], end="", flush=True)
412
elif "status" in chunk:
413
print(f"\nStatus: {chunk['status']}")
414
415
print("\nStream completed")
416
```
417
418
### Job Management and Error Handling
419
420
```python
421
import runpod
422
import time
423
424
endpoint = runpod.Endpoint("your-endpoint-id")
425
426
# Submit job with webhook notification
427
job = endpoint.run(
428
{"prompt": "A complex 3D render", "steps": 100},
429
webhook="https://your-app.com/webhook/job-complete"
430
)
431
432
# Monitor with timeout and cancellation
433
start_time = time.time()
434
max_runtime = 600 # 10 minutes
435
436
try:
437
while True:
438
status = job.status()
439
elapsed = time.time() - start_time
440
441
print(f"Job {job.job_id}: {status['status']} (elapsed: {elapsed:.1f}s)")
442
443
if status["status"] in ["COMPLETED", "FAILED"]:
444
break
445
446
# Cancel if taking too long
447
if elapsed > max_runtime:
448
print("Job taking too long, cancelling...")
449
cancel_result = job.cancel()
450
print(f"Cancelled: {cancel_result}")
451
break
452
453
time.sleep(10)
454
455
# Get results if completed
456
if status["status"] == "COMPLETED":
457
output = job.output()
458
print("Job completed successfully:", output)
459
460
except Exception as e:
461
print(f"Error monitoring job: {e}")
462
# Try to cancel on error
463
try:
464
job.cancel()
465
except:
466
pass
467
```