CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-runpod

Python library for Runpod API and serverless worker SDK.

Pending
Overview
Eval results
Files

endpoint-management.mddocs/

Endpoint Management

Management of serverless endpoints for deploying AI/ML models as scalable APIs, along with high-level client interfaces for interacting with deployed endpoints. Supports both synchronous and asynchronous job submission, real-time monitoring, and comprehensive job lifecycle management.

Core Imports

from runpod import Endpoint, AsyncioEndpoint
from runpod.http_client import ClientSession
from typing import Iterator, AsyncIterator

Capabilities

Endpoint Administration

Create and manage serverless endpoints that can scale automatically based on demand and run custom AI/ML workloads.

def create_endpoint(
    name: str,
    template_id: str,
    gpu_ids: str,
    network_volume_id: str = None,
    locations: str = None,
    idle_timeout: int = 5,
    scaler_type: str = "QUEUE_DELAY",
    scaler_value: int = 4,
    workers_min: int = 0,
    workers_max: int = 3,
    flashboot: bool = False
) -> dict:
    """
    Create a new serverless endpoint.
    
    Parameters:
    - name: Endpoint display name
    - template_id: Pod template ID to use for workers
    - gpu_ids: Comma-separated GPU type IDs (e.g., "NVIDIA GeForce RTX 3070")
    - network_volume_id: Network volume ID for shared storage
    - locations: Comma-separated location preferences
    - idle_timeout: Minutes before idle workers are terminated
    - scaler_type: Scaling algorithm ("QUEUE_DELAY", "REQUEST_COUNT")
    - scaler_value: Scaling threshold value
    - workers_min: Minimum number of workers
    - workers_max: Maximum number of workers
    - flashboot: Enable fast cold start optimization
    
    Returns:
    dict: Created endpoint information with endpoint ID
    """

def get_endpoints() -> list:
    """
    Get list of all user's endpoints.
    
    Returns:
    list: Endpoint information including status and configuration
    """

def update_endpoint_template(endpoint_id: str, template_id: str) -> dict:
    """
    Update an endpoint's template configuration.
    
    Parameters:
    - endpoint_id: Endpoint ID to update
    - template_id: New template ID to use
    
    Returns:
    dict: Update confirmation with new configuration
    """

Synchronous Endpoint Client

High-level client for making synchronous requests to deployed endpoints with comprehensive job management capabilities.

class Endpoint:
    """Synchronous endpoint client for making requests to RunPod endpoints."""
    
    def __init__(self, endpoint_id: str):
        """
        Initialize endpoint client.
        
        Parameters:
        - endpoint_id: The endpoint ID to connect to
        """
    
    def run(self, request_input: dict) -> 'Job':
        """
        Submit a job to the endpoint.
        
        Parameters:
        - request_input: Input data to send to the endpoint
        
        Returns:
        Job: Job instance for monitoring and retrieving results
        """
    
    def run_sync(self, request_input: dict, timeout: int = 86400) -> dict:
        """
        Submit job and wait for completion synchronously.
        
        Parameters:
        - request_input: Input data to send to the endpoint
        - timeout: Maximum wait time in seconds (default: 86400)
        
        Returns:
        dict: Job output when completed
        """
    
    def health(self, timeout: int = 3) -> dict:
        """
        Check the health of the endpoint (number/state of workers, requests).
        
        Parameters:
        - timeout: Seconds to wait for server response (default: 3)
        
        Returns:
        dict: Endpoint health information including worker and request status
        """
    
    def purge_queue(self, timeout: int = 3) -> dict:
        """
        Purge the endpoint's job queue.
        
        Parameters:
        - timeout: Seconds to wait for server response (default: 3)
        
        Returns:
        dict: Purge operation result
        """

class Job:
    """Represents a job submitted to an endpoint."""
    
    def __init__(self, endpoint_id: str, job_id: str):
        """
        Initialize job instance.
        
        Parameters:
        - endpoint_id: Endpoint ID where job is running
        - job_id: Unique job identifier
        """
    
    def status(self) -> dict:
        """
        Get current job status.
        
        Returns:
        dict: Job status information including state and progress
        """
    
    def output(self, timeout: int = 0) -> dict:
        """
        Get job output, optionally waiting for completion.
        
        Parameters:
        - timeout: Maximum wait time in seconds (0 for no timeout)
        
        Returns:
        dict: Job output data when available
        """
    
    def stream(self) -> Iterator[dict]:
        """
        Stream job output as it becomes available.
        
        Returns:
        Iterator[dict]: Generator yielding output chunks
        """
    
    def cancel(self, timeout: int = 3) -> dict:
        """
        Cancel the running job.
        
        Parameters:
        - timeout: Seconds to wait for server response (default: 3)
        
        Returns:
        dict: Cancellation confirmation
        """

Asynchronous Endpoint Client

High-performance asynchronous client for concurrent job processing and improved throughput.

class AsyncioEndpoint:
    """Asynchronous endpoint client for concurrent job processing."""
    
    def __init__(self, endpoint_id: str, session: ClientSession):
        """
        Initialize async endpoint client.
        
        Parameters:
        - endpoint_id: The endpoint ID to connect to
        - session: HTTP client session for async requests
        """
    
    async def run(self, endpoint_input: dict) -> 'AsyncioJob':
        """
        Submit a job asynchronously.
        
        Parameters:
        - endpoint_input: Input data to send to the endpoint
        
        Returns:
        AsyncioJob: Async job instance for monitoring
        """
    
    async def health(self) -> dict:
        """
        Check the health of the endpoint asynchronously.
        
        Returns:
        dict: Endpoint health information
        """
    
    async def purge_queue(self) -> dict:
        """
        Purge the endpoint's job queue asynchronously.
        
        Returns:
        dict: Purge operation result
        """

class AsyncioJob:
    """Represents an asynchronous job submitted to an endpoint."""
    
    def __init__(self, endpoint_id: str, job_id: str, session: ClientSession):
        """
        Initialize async job instance.
        
        Parameters:
        - endpoint_id: Endpoint ID where job is running
        - job_id: Unique job identifier
        - session: HTTP client session for async requests
        """
    
    async def status(self) -> dict:
        """
        Get current job status asynchronously.
        
        Returns:
        dict: Job status information including state and progress
        """
    
    async def output(self, timeout: int = 0) -> dict:
        """
        Get job output asynchronously.
        
        Parameters:
        - timeout: Maximum wait time in seconds (0 for no timeout)
        
        Returns:
        dict: Job output data when available
        """
    
    async def stream(self) -> AsyncIterator[dict]:
        """
        Stream job output asynchronously.
        
        Returns:
        AsyncIterator[dict]: Async generator yielding output chunks
        """
    
    async def cancel(self) -> dict:
        """
        Cancel the running job asynchronously.
        
        Returns:
        dict: Cancellation confirmation
        """

Usage Examples

Creating and Managing Endpoints

import runpod

# Set credentials
runpod.set_credentials("your-api-key")

# Create a new endpoint
endpoint_config = runpod.create_endpoint(
    name="image-generation-endpoint",
    template_id="your-template-id",
    gpu_ids="NVIDIA GeForce RTX 3070,NVIDIA GeForce RTX 4080",
    idle_timeout=3,
    workers_min=0,
    workers_max=5,
    scaler_type="QUEUE_DELAY",
    scaler_value=2,
    flashboot=True
)

print(f"Created endpoint: {endpoint_config['id']}")

# List all endpoints
endpoints = runpod.get_endpoints()
for ep in endpoints:
    print(f"Endpoint {ep['id']}: {ep['name']} - {ep['status']}")

Synchronous Endpoint Usage

import runpod

# Create endpoint client
endpoint = runpod.Endpoint("your-endpoint-id")

# Submit a job and get results synchronously
try:
    result = endpoint.run_sync({
        "prompt": "A beautiful sunset over mountains",
        "steps": 50,
        "width": 512,
        "height": 512
    }, timeout=300)
    
    print("Generated image URL:", result["image_url"])
except Exception as e:
    print(f"Job failed: {e}")

# Submit job for async processing
job = endpoint.run({
    "prompt": "A futuristic cityscape",
    "steps": 30
})

# Monitor job status
while True:
    status = job.status()
    print(f"Job status: {status['status']}")
    
    if status["status"] in ["COMPLETED", "FAILED"]:
        break
    
    time.sleep(5)

# Get final results
if status["status"] == "COMPLETED":
    output = job.output()
    print("Results:", output)

Asynchronous Endpoint Usage

import asyncio
import runpod
from runpod.http_client import ClientSession

async def process_multiple_jobs():
    session = ClientSession()
    endpoint = runpod.AsyncioEndpoint("your-endpoint-id", session)
    
    # Submit multiple jobs concurrently
    jobs = []
    prompts = [
        "A cat in a hat",
        "A dog in space",
        "A robot playing piano"
    ]
    
    for prompt in prompts:
        job = await endpoint.run({"prompt": prompt, "steps": 20})
        jobs.append(job)
    
    # Wait for all jobs to complete
    results = []
    for job in jobs:
        try:
            output = await job.output(timeout=180)
            results.append(output)
        except Exception as e:
            print(f"Job failed: {e}")
            results.append(None)
    
    return results

# Run async job processing
results = asyncio.run(process_multiple_jobs())
for i, result in enumerate(results):
    if result:
        print(f"Job {i+1} completed: {result}")

Streaming Job Output

import runpod

endpoint = runpod.Endpoint("your-endpoint-id")

# Submit job that produces streaming output
job = endpoint.run({
    "prompt": "Generate a long story",
    "stream": True
})

# Stream results as they arrive
print("Streaming output:")
for chunk in job.stream():
    if "text" in chunk:
        print(chunk["text"], end="", flush=True)
    elif "status" in chunk:
        print(f"\nStatus: {chunk['status']}")

print("\nStream completed")

Job Management and Error Handling

import runpod
import time

endpoint = runpod.Endpoint("your-endpoint-id")

# Submit job with webhook notification
job = endpoint.run(
    {"prompt": "A complex 3D render", "steps": 100},
    webhook="https://your-app.com/webhook/job-complete"
)

# Monitor with timeout and cancellation
start_time = time.time()
max_runtime = 600  # 10 minutes

try:
    while True:
        status = job.status()
        elapsed = time.time() - start_time
        
        print(f"Job {job.job_id}: {status['status']} (elapsed: {elapsed:.1f}s)")
        
        if status["status"] in ["COMPLETED", "FAILED"]:
            break
        
        # Cancel if taking too long
        if elapsed > max_runtime:
            print("Job taking too long, cancelling...")
            cancel_result = job.cancel()
            print(f"Cancelled: {cancel_result}")
            break
        
        time.sleep(10)
    
    # Get results if completed
    if status["status"] == "COMPLETED":
        output = job.output()
        print("Job completed successfully:", output)
    
except Exception as e:
    print(f"Error monitoring job: {e}")
    # Try to cancel on error
    try:
        job.cancel()
    except:
        pass

Install with Tessl CLI

npx tessl i tessl/pypi-runpod

docs

configuration.md

endpoint-management.md

index.md

pod-management.md

serverless-worker.md

tile.json