CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-runpod

Python library for Runpod API and serverless worker SDK.

Pending
Overview
Eval results
Files

serverless-worker.mddocs/

Serverless Worker SDK

Comprehensive framework for building and deploying serverless workers that process jobs from RunPod endpoints. Includes job processing, progress reporting, error handling, and file transfer utilities for building production-ready serverless applications.

Core Imports

import runpod
import runpod.serverless
from runpod.serverless import start, progress_update
from runpod.serverless.utils import download_files_from_urls, upload_file_to_bucket, upload_in_memory_object
from runpod import RunPodLogger

Capabilities

Worker Framework

Core serverless worker framework that handles job processing, automatic scaling, and communication with the RunPod platform.

def start(config: dict) -> None:
    """
    Start the serverless worker with the provided configuration.
    
    Parameters:
    - config: Worker configuration dictionary containing:
        - handler: Function to process jobs (required)
        - return_aggregate_stream: Whether to aggregate streaming responses
        - rp_log_level: Logging level ("DEBUG", "INFO", "WARNING", "ERROR")
        - rp_debugger: Enable debug mode
        - refresh_worker: Auto-refresh worker on errors
    
    The handler function should accept a job dictionary and return results.
    Handler signature: handler(job: dict) -> dict
    
    Example config:
    {
        "handler": my_job_handler,
        "return_aggregate_stream": True,
        "rp_log_level": "INFO"
    }
    """

Progress Reporting

Real-time progress reporting system for long-running jobs to provide status updates to clients.

def progress_update(
    job_id: str,
    progress: int,
    status: str = None,
    **kwargs
) -> None:
    """
    Send progress updates during job execution.
    
    Parameters:
    - job_id: Job identifier from the job input
    - progress: Progress percentage (0-100)
    - status: Optional status message
    - **kwargs: Additional progress data (e.g., current_step, total_steps)
    
    Example:
    progress_update(job["id"], 25, "Processing input data")
    progress_update(job["id"], 75, current_step=3, total_steps=4)
    """

File Transfer Utilities

Utilities for downloading input files and uploading output files to cloud storage, enabling seamless file-based workflows.

def download_files_from_urls(
    job_id: str,
    urls: list
) -> list:
    """
    Download files from URLs to local storage in job directory.
    
    Parameters:
    - job_id: Job identifier for organizing downloaded files
    - urls: Single URL string or list of URLs to download
    
    Returns:
    list: List of local file paths for downloaded files
    """

def upload_file_to_bucket(
    file_name: str,
    file_location: str,
    bucket_creds: dict = None,
    bucket_name: str = None,
    prefix: str = None,
    extra_args: dict = None
) -> str:
    """
    Upload file to cloud storage bucket.
    
    Parameters:
    - file_name: Name for the uploaded file
    - file_location: Local path to file to upload
    - bucket_creds: S3-compatible bucket credentials
    - bucket_name: Target bucket name
    - prefix: Optional prefix for the object key
    - extra_args: Additional upload arguments
    
    Returns:
    str: Public URL of uploaded file
    """

def upload_in_memory_object(
    file_name: str,
    file_data: bytes,
    bucket_creds: dict = None,
    bucket_name: str = None,
    prefix: str = None
) -> str:
    """
    Upload in-memory data directly to cloud storage.
    
    Parameters:
    - file_name: Name for the uploaded file
    - file_data: Binary data to upload
    - bucket_creds: S3-compatible bucket credentials
    - bucket_name: Target bucket name
    - prefix: Optional prefix for the object key
    
    Returns:
    str: Public URL of uploaded file
    """

Logging Utilities

Specialized logging system optimized for serverless environments with structured output and integration with RunPod monitoring.

class RunPodLogger:
    """Logger class optimized for RunPod serverless environments."""
    
    def __init__(self):
        """Initialize RunPod logger with default configuration."""
    
    def debug(self, message: str, **kwargs) -> None:
        """Log debug message with optional structured data."""
    
    def info(self, message: str, **kwargs) -> None:
        """Log info message with optional structured data."""
    
    def warn(self, message: str, **kwargs) -> None:
        """Log warning message with optional structured data."""
    
    def error(self, message: str, **kwargs) -> None:
        """Log error message with optional structured data."""

Usage Examples

Basic Serverless Worker

import runpod

def my_handler(job):
    """Process a job and return results."""
    # Get job input
    job_input = job["input"]
    job_id = job["id"]
    
    # Extract parameters
    prompt = job_input.get("prompt", "")
    steps = job_input.get("steps", 50)
    
    # Send progress update
    runpod.serverless.progress_update(job_id, 10, "Starting generation")
    
    # Your processing logic here
    # (e.g., run AI model, process data, etc.)
    result_data = process_request(prompt, steps)
    
    # Send completion progress
    runpod.serverless.progress_update(job_id, 100, "Generation complete")
    
    # Return results
    return {
        "output": result_data,
        "status": "completed"
    }

def process_request(prompt, steps):
    """Your actual processing logic."""
    # Simulate processing
    import time
    time.sleep(2)
    return f"Generated content for: {prompt} with {steps} steps"

# Start the serverless worker
if __name__ == "__main__":
    runpod.serverless.start({
        "handler": my_handler,
        "return_aggregate_stream": True,
        "rp_log_level": "INFO"
    })

Advanced Worker with File Processing

import runpod
import os
import json
from PIL import Image

def image_processing_handler(job):
    """Process images with progress reporting and file handling."""
    job_input = job["input"]
    job_id = job["id"]
    
    # Download input images
    input_urls = job_input.get("image_urls", [])
    runpod.serverless.progress_update(job_id, 5, "Downloading input images")
    
    # Download files
    downloaded_files = runpod.serverless.utils.download_files_from_urls(
        job_id,
        input_urls
    )
    
    runpod.serverless.progress_update(job_id, 20, "Processing images")
    
    processed_files = []
    total_files = len(downloaded_files)
    
    for i, file_path in enumerate(downloaded_files):
        # Process each image
        processed_path = process_image(file_path, job_input)
        processed_files.append(processed_path)
        
        # Update progress
        progress = 20 + (60 * (i + 1) / total_files)
        runpod.serverless.progress_update(
            job_id, 
            int(progress), 
            f"Processed {i+1}/{total_files} images"
        )
    
    # Upload results
    runpod.serverless.progress_update(job_id, 85, "Uploading results")
    
    output_urls = []
    for file_path in processed_files:
        url = runpod.serverless.utils.upload_file_to_bucket(
            file_name=os.path.basename(file_path),
            file_location=file_path
        )
        output_urls.append(url)
    
    runpod.serverless.progress_update(job_id, 100, "Processing complete")
    
    return {
        "output_urls": output_urls,
        "processed_count": len(output_urls)
    }

def process_image(input_path, params):
    """Process a single image file."""
    # Load image
    image = Image.open(input_path)
    
    # Apply processing (example: resize)
    width = params.get("width", 512)
    height = params.get("height", 512)
    processed_image = image.resize((width, height))
    
    # Save processed image
    output_path = input_path.replace("/inputs/", "/outputs/").replace(".jpg", "_processed.jpg")
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    processed_image.save(output_path, "JPEG", quality=95)
    
    return output_path

# Start worker
if __name__ == "__main__":
    runpod.serverless.start({
        "handler": image_processing_handler,
        "return_aggregate_stream": True,
        "rp_log_level": "DEBUG"
    })

Streaming Response Worker

import runpod
import time

def streaming_handler(job):
    """Generate streaming responses for real-time output."""
    job_input = job["input"]
    job_id = job["id"]
    
    prompt = job_input.get("prompt", "")
    max_tokens = job_input.get("max_tokens", 100)
    
    # Initialize streaming response
    generated_text = ""
    
    # Simulate streaming text generation
    words = prompt.split() + ["generated", "response", "with", "streaming", "output"]
    
    for i, word in enumerate(words[:max_tokens]):
        # Add word to output
        generated_text += word + " "
        
        # Yield intermediate result for streaming
        yield {
            "text": word + " ",
            "partial_text": generated_text,
            "progress": int((i + 1) / len(words) * 100),
            "tokens_generated": i + 1
        }
        
        # Simulate processing time
        time.sleep(0.1)
    
    # Final result
    yield {
        "text": generated_text.strip(),
        "completed": True,
        "total_tokens": len(words)
    }

# Start streaming worker
if __name__ == "__main__":
    runpod.serverless.start({
        "handler": streaming_handler,
        "return_aggregate_stream": True  # Important for streaming
    })

Error Handling and Logging

import runpod
import traceback

# Initialize logger
logger = runpod.RunPodLogger()

def robust_handler(job):
    """Handler with comprehensive error handling."""
    job_input = job["input"]
    job_id = job["id"]
    
    try:
        logger.info("Starting job processing", job_id=job_id)
        
        # Validate input
        if not validate_input(job_input):
            logger.error("Invalid input provided", input=job_input)
            return {
                "error": "Invalid input parameters",
                "status": "failed"
            }
        
        # Process with progress updates
        runpod.serverless.progress_update(job_id, 10, "Input validated")
        
        result = perform_complex_processing(job_input, job_id)
        
        logger.info("Job completed successfully", job_id=job_id)
        return {
            "output": result,
            "status": "completed"
        }
        
    except ValueError as e:
        logger.error("Validation error", error=str(e), job_id=job_id)
        return {
            "error": f"Validation error: {str(e)}",
            "status": "failed"
        }
    
    except Exception as e:
        logger.error(
            "Unexpected error during processing",
            error=str(e),
            traceback=traceback.format_exc(),
            job_id=job_id
        )
        return {
            "error": "Internal processing error",
            "status": "failed"
        }

def validate_input(job_input):
    """Validate job input parameters."""
    required_fields = ["prompt", "model"]
    for field in required_fields:
        if field not in job_input:
            return False
    return True

def perform_complex_processing(job_input, job_id):
    """Simulate complex processing with progress updates."""
    steps = ["preprocessing", "model_loading", "inference", "postprocessing"]
    
    for i, step in enumerate(steps):
        logger.debug(f"Executing {step}", job_id=job_id)
        
        # Simulate processing time
        time.sleep(1)
        
        # Update progress
        progress = int((i + 1) / len(steps) * 90)  # Leave 10% for final steps
        runpod.serverless.progress_update(job_id, progress, f"Completed {step}")
    
    return {"result": "Processing completed successfully"}

# Start worker with error handling
if __name__ == "__main__":
    runpod.serverless.start({
        "handler": robust_handler,
        "return_aggregate_stream": True,
        "rp_log_level": "DEBUG",
        "refresh_worker": True  # Auto-restart on errors
    })

Environment Configuration

import runpod
import os

def configure_environment():
    """Set up environment for serverless worker."""
    # Set up GPU memory if available
    if "CUDA_VISIBLE_DEVICES" in os.environ:
        import torch
        torch.cuda.empty_cache()
    
    # Configure logging
    import logging
    logging.basicConfig(level=logging.INFO)
    
    return True

def production_handler(job):
    """Production-ready handler with full configuration."""
    # Environment setup
    if not configure_environment():
        return {"error": "Environment setup failed"}
    
    # Process job
    return process_job(job)

def process_job(job):
    """Main job processing logic."""
    # Your implementation here
    return {"status": "completed"}

# Production serverless worker
if __name__ == "__main__":
    runpod.serverless.start({
        "handler": production_handler,
        "return_aggregate_stream": True,
        "rp_log_level": "INFO",
        "refresh_worker": True,
        "rp_debugger": False  # Disable debug mode in production
    })

Install with Tessl CLI

npx tessl i tessl/pypi-runpod

docs

configuration.md

endpoint-management.md

index.md

pod-management.md

serverless-worker.md

tile.json