Python library for Runpod API and serverless worker SDK.
—
Comprehensive framework for building and deploying serverless workers that process jobs from RunPod endpoints. Includes job processing, progress reporting, error handling, and file transfer utilities for building production-ready serverless applications.
import runpod
import runpod.serverless
from runpod.serverless import start, progress_update
from runpod.serverless.utils import download_files_from_urls, upload_file_to_bucket, upload_in_memory_object
from runpod import RunPodLoggerCore serverless worker framework that handles job processing, automatic scaling, and communication with the RunPod platform.
def start(config: dict) -> None:
"""
Start the serverless worker with the provided configuration.
Parameters:
- config: Worker configuration dictionary containing:
- handler: Function to process jobs (required)
- return_aggregate_stream: Whether to aggregate streaming responses
- rp_log_level: Logging level ("DEBUG", "INFO", "WARNING", "ERROR")
- rp_debugger: Enable debug mode
- refresh_worker: Auto-refresh worker on errors
The handler function should accept a job dictionary and return results.
Handler signature: handler(job: dict) -> dict
Example config:
{
"handler": my_job_handler,
"return_aggregate_stream": True,
"rp_log_level": "INFO"
}
"""Real-time progress reporting system for long-running jobs to provide status updates to clients.
def progress_update(
job_id: str,
progress: int,
status: str = None,
**kwargs
) -> None:
"""
Send progress updates during job execution.
Parameters:
- job_id: Job identifier from the job input
- progress: Progress percentage (0-100)
- status: Optional status message
- **kwargs: Additional progress data (e.g., current_step, total_steps)
Example:
progress_update(job["id"], 25, "Processing input data")
progress_update(job["id"], 75, current_step=3, total_steps=4)
"""Utilities for downloading input files and uploading output files to cloud storage, enabling seamless file-based workflows.
def download_files_from_urls(
job_id: str,
urls: list
) -> list:
"""
Download files from URLs to local storage in job directory.
Parameters:
- job_id: Job identifier for organizing downloaded files
- urls: Single URL string or list of URLs to download
Returns:
list: List of local file paths for downloaded files
"""
def upload_file_to_bucket(
file_name: str,
file_location: str,
bucket_creds: dict = None,
bucket_name: str = None,
prefix: str = None,
extra_args: dict = None
) -> str:
"""
Upload file to cloud storage bucket.
Parameters:
- file_name: Name for the uploaded file
- file_location: Local path to file to upload
- bucket_creds: S3-compatible bucket credentials
- bucket_name: Target bucket name
- prefix: Optional prefix for the object key
- extra_args: Additional upload arguments
Returns:
str: Public URL of uploaded file
"""
def upload_in_memory_object(
file_name: str,
file_data: bytes,
bucket_creds: dict = None,
bucket_name: str = None,
prefix: str = None
) -> str:
"""
Upload in-memory data directly to cloud storage.
Parameters:
- file_name: Name for the uploaded file
- file_data: Binary data to upload
- bucket_creds: S3-compatible bucket credentials
- bucket_name: Target bucket name
- prefix: Optional prefix for the object key
Returns:
str: Public URL of uploaded file
"""Specialized logging system optimized for serverless environments with structured output and integration with RunPod monitoring.
class RunPodLogger:
"""Logger class optimized for RunPod serverless environments."""
def __init__(self):
"""Initialize RunPod logger with default configuration."""
def debug(self, message: str, **kwargs) -> None:
"""Log debug message with optional structured data."""
def info(self, message: str, **kwargs) -> None:
"""Log info message with optional structured data."""
def warn(self, message: str, **kwargs) -> None:
"""Log warning message with optional structured data."""
def error(self, message: str, **kwargs) -> None:
"""Log error message with optional structured data."""import runpod
def my_handler(job):
"""Process a job and return results."""
# Get job input
job_input = job["input"]
job_id = job["id"]
# Extract parameters
prompt = job_input.get("prompt", "")
steps = job_input.get("steps", 50)
# Send progress update
runpod.serverless.progress_update(job_id, 10, "Starting generation")
# Your processing logic here
# (e.g., run AI model, process data, etc.)
result_data = process_request(prompt, steps)
# Send completion progress
runpod.serverless.progress_update(job_id, 100, "Generation complete")
# Return results
return {
"output": result_data,
"status": "completed"
}
def process_request(prompt, steps):
"""Your actual processing logic."""
# Simulate processing
import time
time.sleep(2)
return f"Generated content for: {prompt} with {steps} steps"
# Start the serverless worker
if __name__ == "__main__":
runpod.serverless.start({
"handler": my_handler,
"return_aggregate_stream": True,
"rp_log_level": "INFO"
})import runpod
import os
import json
from PIL import Image
def image_processing_handler(job):
"""Process images with progress reporting and file handling."""
job_input = job["input"]
job_id = job["id"]
# Download input images
input_urls = job_input.get("image_urls", [])
runpod.serverless.progress_update(job_id, 5, "Downloading input images")
# Download files
downloaded_files = runpod.serverless.utils.download_files_from_urls(
job_id,
input_urls
)
runpod.serverless.progress_update(job_id, 20, "Processing images")
processed_files = []
total_files = len(downloaded_files)
for i, file_path in enumerate(downloaded_files):
# Process each image
processed_path = process_image(file_path, job_input)
processed_files.append(processed_path)
# Update progress
progress = 20 + (60 * (i + 1) / total_files)
runpod.serverless.progress_update(
job_id,
int(progress),
f"Processed {i+1}/{total_files} images"
)
# Upload results
runpod.serverless.progress_update(job_id, 85, "Uploading results")
output_urls = []
for file_path in processed_files:
url = runpod.serverless.utils.upload_file_to_bucket(
file_name=os.path.basename(file_path),
file_location=file_path
)
output_urls.append(url)
runpod.serverless.progress_update(job_id, 100, "Processing complete")
return {
"output_urls": output_urls,
"processed_count": len(output_urls)
}
def process_image(input_path, params):
"""Process a single image file."""
# Load image
image = Image.open(input_path)
# Apply processing (example: resize)
width = params.get("width", 512)
height = params.get("height", 512)
processed_image = image.resize((width, height))
# Save processed image
output_path = input_path.replace("/inputs/", "/outputs/").replace(".jpg", "_processed.jpg")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
processed_image.save(output_path, "JPEG", quality=95)
return output_path
# Start worker
if __name__ == "__main__":
runpod.serverless.start({
"handler": image_processing_handler,
"return_aggregate_stream": True,
"rp_log_level": "DEBUG"
})import runpod
import time
def streaming_handler(job):
"""Generate streaming responses for real-time output."""
job_input = job["input"]
job_id = job["id"]
prompt = job_input.get("prompt", "")
max_tokens = job_input.get("max_tokens", 100)
# Initialize streaming response
generated_text = ""
# Simulate streaming text generation
words = prompt.split() + ["generated", "response", "with", "streaming", "output"]
for i, word in enumerate(words[:max_tokens]):
# Add word to output
generated_text += word + " "
# Yield intermediate result for streaming
yield {
"text": word + " ",
"partial_text": generated_text,
"progress": int((i + 1) / len(words) * 100),
"tokens_generated": i + 1
}
# Simulate processing time
time.sleep(0.1)
# Final result
yield {
"text": generated_text.strip(),
"completed": True,
"total_tokens": len(words)
}
# Start streaming worker
if __name__ == "__main__":
runpod.serverless.start({
"handler": streaming_handler,
"return_aggregate_stream": True # Important for streaming
})import runpod
import traceback
# Initialize logger
logger = runpod.RunPodLogger()
def robust_handler(job):
"""Handler with comprehensive error handling."""
job_input = job["input"]
job_id = job["id"]
try:
logger.info("Starting job processing", job_id=job_id)
# Validate input
if not validate_input(job_input):
logger.error("Invalid input provided", input=job_input)
return {
"error": "Invalid input parameters",
"status": "failed"
}
# Process with progress updates
runpod.serverless.progress_update(job_id, 10, "Input validated")
result = perform_complex_processing(job_input, job_id)
logger.info("Job completed successfully", job_id=job_id)
return {
"output": result,
"status": "completed"
}
except ValueError as e:
logger.error("Validation error", error=str(e), job_id=job_id)
return {
"error": f"Validation error: {str(e)}",
"status": "failed"
}
except Exception as e:
logger.error(
"Unexpected error during processing",
error=str(e),
traceback=traceback.format_exc(),
job_id=job_id
)
return {
"error": "Internal processing error",
"status": "failed"
}
def validate_input(job_input):
"""Validate job input parameters."""
required_fields = ["prompt", "model"]
for field in required_fields:
if field not in job_input:
return False
return True
def perform_complex_processing(job_input, job_id):
"""Simulate complex processing with progress updates."""
steps = ["preprocessing", "model_loading", "inference", "postprocessing"]
for i, step in enumerate(steps):
logger.debug(f"Executing {step}", job_id=job_id)
# Simulate processing time
time.sleep(1)
# Update progress
progress = int((i + 1) / len(steps) * 90) # Leave 10% for final steps
runpod.serverless.progress_update(job_id, progress, f"Completed {step}")
return {"result": "Processing completed successfully"}
# Start worker with error handling
if __name__ == "__main__":
runpod.serverless.start({
"handler": robust_handler,
"return_aggregate_stream": True,
"rp_log_level": "DEBUG",
"refresh_worker": True # Auto-restart on errors
})import runpod
import os
def configure_environment():
"""Set up environment for serverless worker."""
# Set up GPU memory if available
if "CUDA_VISIBLE_DEVICES" in os.environ:
import torch
torch.cuda.empty_cache()
# Configure logging
import logging
logging.basicConfig(level=logging.INFO)
return True
def production_handler(job):
"""Production-ready handler with full configuration."""
# Environment setup
if not configure_environment():
return {"error": "Environment setup failed"}
# Process job
return process_job(job)
def process_job(job):
"""Main job processing logic."""
# Your implementation here
return {"status": "completed"}
# Production serverless worker
if __name__ == "__main__":
runpod.serverless.start({
"handler": production_handler,
"return_aggregate_stream": True,
"rp_log_level": "INFO",
"refresh_worker": True,
"rp_debugger": False # Disable debug mode in production
})Install with Tessl CLI
npx tessl i tessl/pypi-runpod