Python library for Runpod API and serverless worker SDK.
—
Management of serverless endpoints for deploying AI/ML models as scalable APIs, along with high-level client interfaces for interacting with deployed endpoints. Supports both synchronous and asynchronous job submission, real-time monitoring, and comprehensive job lifecycle management.
from runpod import Endpoint, AsyncioEndpoint
from runpod.http_client import ClientSession
from typing import Iterator, AsyncIteratorCreate and manage serverless endpoints that can scale automatically based on demand and run custom AI/ML workloads.
def create_endpoint(
name: str,
template_id: str,
gpu_ids: str,
network_volume_id: str = None,
locations: str = None,
idle_timeout: int = 5,
scaler_type: str = "QUEUE_DELAY",
scaler_value: int = 4,
workers_min: int = 0,
workers_max: int = 3,
flashboot: bool = False
) -> dict:
"""
Create a new serverless endpoint.
Parameters:
- name: Endpoint display name
- template_id: Pod template ID to use for workers
- gpu_ids: Comma-separated GPU type IDs (e.g., "NVIDIA GeForce RTX 3070")
- network_volume_id: Network volume ID for shared storage
- locations: Comma-separated location preferences
- idle_timeout: Minutes before idle workers are terminated
- scaler_type: Scaling algorithm ("QUEUE_DELAY", "REQUEST_COUNT")
- scaler_value: Scaling threshold value
- workers_min: Minimum number of workers
- workers_max: Maximum number of workers
- flashboot: Enable fast cold start optimization
Returns:
dict: Created endpoint information with endpoint ID
"""
def get_endpoints() -> list:
"""
Get list of all user's endpoints.
Returns:
list: Endpoint information including status and configuration
"""
def update_endpoint_template(endpoint_id: str, template_id: str) -> dict:
"""
Update an endpoint's template configuration.
Parameters:
- endpoint_id: Endpoint ID to update
- template_id: New template ID to use
Returns:
dict: Update confirmation with new configuration
"""High-level client for making synchronous requests to deployed endpoints with comprehensive job management capabilities.
class Endpoint:
"""Synchronous endpoint client for making requests to RunPod endpoints."""
def __init__(self, endpoint_id: str):
"""
Initialize endpoint client.
Parameters:
- endpoint_id: The endpoint ID to connect to
"""
def run(self, request_input: dict) -> 'Job':
"""
Submit a job to the endpoint.
Parameters:
- request_input: Input data to send to the endpoint
Returns:
Job: Job instance for monitoring and retrieving results
"""
def run_sync(self, request_input: dict, timeout: int = 86400) -> dict:
"""
Submit job and wait for completion synchronously.
Parameters:
- request_input: Input data to send to the endpoint
- timeout: Maximum wait time in seconds (default: 86400)
Returns:
dict: Job output when completed
"""
def health(self, timeout: int = 3) -> dict:
"""
Check the health of the endpoint (number/state of workers, requests).
Parameters:
- timeout: Seconds to wait for server response (default: 3)
Returns:
dict: Endpoint health information including worker and request status
"""
def purge_queue(self, timeout: int = 3) -> dict:
"""
Purge the endpoint's job queue.
Parameters:
- timeout: Seconds to wait for server response (default: 3)
Returns:
dict: Purge operation result
"""
class Job:
"""Represents a job submitted to an endpoint."""
def __init__(self, endpoint_id: str, job_id: str):
"""
Initialize job instance.
Parameters:
- endpoint_id: Endpoint ID where job is running
- job_id: Unique job identifier
"""
def status(self) -> dict:
"""
Get current job status.
Returns:
dict: Job status information including state and progress
"""
def output(self, timeout: int = 0) -> dict:
"""
Get job output, optionally waiting for completion.
Parameters:
- timeout: Maximum wait time in seconds (0 for no timeout)
Returns:
dict: Job output data when available
"""
def stream(self) -> Iterator[dict]:
"""
Stream job output as it becomes available.
Returns:
Iterator[dict]: Generator yielding output chunks
"""
def cancel(self, timeout: int = 3) -> dict:
"""
Cancel the running job.
Parameters:
- timeout: Seconds to wait for server response (default: 3)
Returns:
dict: Cancellation confirmation
"""High-performance asynchronous client for concurrent job processing and improved throughput.
class AsyncioEndpoint:
"""Asynchronous endpoint client for concurrent job processing."""
def __init__(self, endpoint_id: str, session: ClientSession):
"""
Initialize async endpoint client.
Parameters:
- endpoint_id: The endpoint ID to connect to
- session: HTTP client session for async requests
"""
async def run(self, endpoint_input: dict) -> 'AsyncioJob':
"""
Submit a job asynchronously.
Parameters:
- endpoint_input: Input data to send to the endpoint
Returns:
AsyncioJob: Async job instance for monitoring
"""
async def health(self) -> dict:
"""
Check the health of the endpoint asynchronously.
Returns:
dict: Endpoint health information
"""
async def purge_queue(self) -> dict:
"""
Purge the endpoint's job queue asynchronously.
Returns:
dict: Purge operation result
"""
class AsyncioJob:
"""Represents an asynchronous job submitted to an endpoint."""
def __init__(self, endpoint_id: str, job_id: str, session: ClientSession):
"""
Initialize async job instance.
Parameters:
- endpoint_id: Endpoint ID where job is running
- job_id: Unique job identifier
- session: HTTP client session for async requests
"""
async def status(self) -> dict:
"""
Get current job status asynchronously.
Returns:
dict: Job status information including state and progress
"""
async def output(self, timeout: int = 0) -> dict:
"""
Get job output asynchronously.
Parameters:
- timeout: Maximum wait time in seconds (0 for no timeout)
Returns:
dict: Job output data when available
"""
async def stream(self) -> AsyncIterator[dict]:
"""
Stream job output asynchronously.
Returns:
AsyncIterator[dict]: Async generator yielding output chunks
"""
async def cancel(self) -> dict:
"""
Cancel the running job asynchronously.
Returns:
dict: Cancellation confirmation
"""import runpod
# Set credentials
runpod.set_credentials("your-api-key")
# Create a new endpoint
endpoint_config = runpod.create_endpoint(
name="image-generation-endpoint",
template_id="your-template-id",
gpu_ids="NVIDIA GeForce RTX 3070,NVIDIA GeForce RTX 4080",
idle_timeout=3,
workers_min=0,
workers_max=5,
scaler_type="QUEUE_DELAY",
scaler_value=2,
flashboot=True
)
print(f"Created endpoint: {endpoint_config['id']}")
# List all endpoints
endpoints = runpod.get_endpoints()
for ep in endpoints:
print(f"Endpoint {ep['id']}: {ep['name']} - {ep['status']}")import runpod
# Create endpoint client
endpoint = runpod.Endpoint("your-endpoint-id")
# Submit a job and get results synchronously
try:
result = endpoint.run_sync({
"prompt": "A beautiful sunset over mountains",
"steps": 50,
"width": 512,
"height": 512
}, timeout=300)
print("Generated image URL:", result["image_url"])
except Exception as e:
print(f"Job failed: {e}")
# Submit job for async processing
job = endpoint.run({
"prompt": "A futuristic cityscape",
"steps": 30
})
# Monitor job status
while True:
status = job.status()
print(f"Job status: {status['status']}")
if status["status"] in ["COMPLETED", "FAILED"]:
break
time.sleep(5)
# Get final results
if status["status"] == "COMPLETED":
output = job.output()
print("Results:", output)import asyncio
import runpod
from runpod.http_client import ClientSession
async def process_multiple_jobs():
session = ClientSession()
endpoint = runpod.AsyncioEndpoint("your-endpoint-id", session)
# Submit multiple jobs concurrently
jobs = []
prompts = [
"A cat in a hat",
"A dog in space",
"A robot playing piano"
]
for prompt in prompts:
job = await endpoint.run({"prompt": prompt, "steps": 20})
jobs.append(job)
# Wait for all jobs to complete
results = []
for job in jobs:
try:
output = await job.output(timeout=180)
results.append(output)
except Exception as e:
print(f"Job failed: {e}")
results.append(None)
return results
# Run async job processing
results = asyncio.run(process_multiple_jobs())
for i, result in enumerate(results):
if result:
print(f"Job {i+1} completed: {result}")import runpod
endpoint = runpod.Endpoint("your-endpoint-id")
# Submit job that produces streaming output
job = endpoint.run({
"prompt": "Generate a long story",
"stream": True
})
# Stream results as they arrive
print("Streaming output:")
for chunk in job.stream():
if "text" in chunk:
print(chunk["text"], end="", flush=True)
elif "status" in chunk:
print(f"\nStatus: {chunk['status']}")
print("\nStream completed")import runpod
import time
endpoint = runpod.Endpoint("your-endpoint-id")
# Submit job with webhook notification
job = endpoint.run(
{"prompt": "A complex 3D render", "steps": 100},
webhook="https://your-app.com/webhook/job-complete"
)
# Monitor with timeout and cancellation
start_time = time.time()
max_runtime = 600 # 10 minutes
try:
while True:
status = job.status()
elapsed = time.time() - start_time
print(f"Job {job.job_id}: {status['status']} (elapsed: {elapsed:.1f}s)")
if status["status"] in ["COMPLETED", "FAILED"]:
break
# Cancel if taking too long
if elapsed > max_runtime:
print("Job taking too long, cancelling...")
cancel_result = job.cancel()
print(f"Cancelled: {cancel_result}")
break
time.sleep(10)
# Get results if completed
if status["status"] == "COMPLETED":
output = job.output()
print("Job completed successfully:", output)
except Exception as e:
print(f"Error monitoring job: {e}")
# Try to cancel on error
try:
job.cancel()
except:
passInstall with Tessl CLI
npx tessl i tessl/pypi-runpod