Python library for Runpod API and serverless worker SDK.
npx @tessl/cli install tessl/pypi-runpod@1.7.0A comprehensive Python library for interacting with the RunPod cloud computing platform, offering both API wrapper functionality for managing GPU cloud resources (pods) and a serverless worker SDK for deploying custom endpoints. It enables developers to programmatically create, manage, and interact with GPU-accelerated cloud instances, deploy serverless AI/ML models as custom APIs, and handle job processing in distributed cloud environments.
pip install runpodimport runpodCommon usage patterns:
# For pod management
from runpod import create_pod, get_pods, terminate_pod
# For endpoint interactions
from runpod import Endpoint, AsyncioEndpoint
from typing import Iterator
# For serverless worker development
import runpod.serverless as serverless
# For configuration
from runpod import set_credentials, get_credentialsimport runpod
# Set API credentials
runpod.set_credentials("your-api-key")
# Create a GPU pod
pod_config = {
"name": "my-gpu-pod",
"image_name": "runpod/pytorch:1.13.1-py3.10-cuda11.8.0-devel-ubuntu22.04",
"gpu_type_id": "NVIDIA GeForce RTX 3070",
"cloud_type": "SECURE",
"container_disk_in_gb": 10,
"volume_in_gb": 20
}
pod = runpod.create_pod(**pod_config)
print(f"Created pod: {pod['id']}")
# List all pods
pods = runpod.get_pods()
for pod in pods:
print(f"Pod {pod['id']}: {pod['name']} - {pod['desiredStatus']}")
# Terminate a pod
runpod.terminate_pod(pod['id'])import runpod
# Create synchronous endpoint client
endpoint = runpod.Endpoint("your-endpoint-id")
# Run a job
job = endpoint.run({
"prompt": "A beautiful landscape painting",
"steps": 50
})
# Get results
result = job.output(timeout=300)
print(result)import runpod
def my_handler(job):
"""Process a job and return results."""
job_input = job["input"]
# Your processing logic here
result = {"output": f"Processed: {job_input}"}
return result
# Start the serverless worker
if __name__ == "__main__":
runpod.serverless.start({
"handler": my_handler,
"return_aggregate_stream": True
})The RunPod SDK is organized into several key modules:
Comprehensive pod lifecycle management including creation, monitoring, control, and cleanup of GPU and CPU cloud instances. Supports various GPU types, custom container images, persistent storage volumes, and network configuration.
def create_pod(name: str, image_name: str, gpu_type_id: str, **kwargs) -> dict: ...
def get_pod(pod_id: str) -> dict: ...
def get_pods() -> list: ...
def stop_pod(pod_id: str) -> dict: ...
def resume_pod(pod_id: str) -> dict: ...
def terminate_pod(pod_id: str) -> dict: ...Discovery and management of available GPU types, pricing, and availability across different cloud regions. Provides detailed hardware specifications and real-time availability data.
def get_gpu(gpu_id: str) -> dict: ...
def get_gpus() -> list: ...Management of serverless endpoints for deploying AI/ML models as scalable APIs. Handles endpoint creation, template management, scaling configuration, and deployment lifecycle.
def create_endpoint(**kwargs) -> dict: ...
def get_endpoints() -> list: ...
def update_endpoint_template(endpoint_id: str, template_id: str) -> dict: ...High-level client interfaces for interacting with deployed serverless endpoints. Supports both synchronous and asynchronous job submission, real-time status monitoring, output streaming, and job cancellation.
class Endpoint:
def __init__(self, endpoint_id: str): ...
def run(self, request_input: dict) -> 'Job': ...
def run_sync(self, request_input: dict, timeout: int = 86400) -> dict: ...
def health(self, timeout: int = 3) -> dict: ...
def purge_queue(self, timeout: int = 3) -> dict: ...
class Job:
def status(self) -> dict: ...
def output(self, timeout: int = 0) -> dict: ...
def stream(self) -> Iterator[dict]: ...
def cancel(self, timeout: int = 3) -> dict: ...Comprehensive framework for building and deploying serverless workers that process jobs from RunPod endpoints. Includes job processing, progress reporting, error handling, and file transfer utilities.
def start(config: dict) -> None: ...
def progress_update(job_id: str, progress: int, **kwargs) -> None: ...Credential management, API authentication, and configuration utilities for managing multiple profiles and environments.
def set_credentials(api_key: str, profile: str = "default") -> None: ...
def get_credentials(profile: str = "default") -> dict: ...
def check_credentials(profile: str = "default") -> bool: ...Management of pod templates for consistent deployments and container registry authentication for private images.
def create_template(**kwargs) -> dict: ...
def create_container_registry_auth(**kwargs) -> dict: ...
def update_container_registry_auth(auth_id: str, **kwargs) -> dict: ...
def delete_container_registry_auth(auth_id: str) -> dict: ...User profile and account settings management including SSH key configuration and account information retrieval.
def get_user() -> dict: ...
def update_user_settings(**kwargs) -> dict: ...__version__: str # Package version
SSH_KEY_PATH: str # Path to SSH key directory (~/.runpod/ssh)
profile: str # Current configuration profile name
api_key: str # API key for authentication (None if not set)
endpoint_url_base: str # Base URL for API endpoints