0
# Serverless Worker SDK
1
2
Comprehensive framework for building and deploying serverless workers that process jobs from RunPod endpoints. Includes job processing, progress reporting, error handling, and file transfer utilities for building production-ready serverless applications.
3
4
## Core Imports
5
6
```python
7
import runpod
8
import runpod.serverless
9
from runpod.serverless import start, progress_update
10
from runpod.serverless.utils import download_files_from_urls, upload_file_to_bucket, upload_in_memory_object
11
from runpod import RunPodLogger
12
```
13
14
## Capabilities
15
16
### Worker Framework
17
18
Core serverless worker framework that handles job processing, automatic scaling, and communication with the RunPod platform.
19
20
```python { .api }
21
def start(config: dict) -> None:
22
"""
23
Start the serverless worker with the provided configuration.
24
25
Parameters:
26
- config: Worker configuration dictionary containing:
27
- handler: Function to process jobs (required)
28
- return_aggregate_stream: Whether to aggregate streaming responses
29
- rp_log_level: Logging level ("DEBUG", "INFO", "WARNING", "ERROR")
30
- rp_debugger: Enable debug mode
31
- refresh_worker: Auto-refresh worker on errors
32
33
The handler function should accept a job dictionary and return results.
34
Handler signature: handler(job: dict) -> dict
35
36
Example config:
37
{
38
"handler": my_job_handler,
39
"return_aggregate_stream": True,
40
"rp_log_level": "INFO"
41
}
42
"""
43
```
44
45
### Progress Reporting
46
47
Real-time progress reporting system for long-running jobs to provide status updates to clients.
48
49
```python { .api }
50
def progress_update(
51
job_id: str,
52
progress: int,
53
status: str = None,
54
**kwargs
55
) -> None:
56
"""
57
Send progress updates during job execution.
58
59
Parameters:
60
- job_id: Job identifier from the job input
61
- progress: Progress percentage (0-100)
62
- status: Optional status message
63
- **kwargs: Additional progress data (e.g., current_step, total_steps)
64
65
Example:
66
progress_update(job["id"], 25, "Processing input data")
67
progress_update(job["id"], 75, current_step=3, total_steps=4)
68
"""
69
```
70
71
### File Transfer Utilities
72
73
Utilities for downloading input files and uploading output files to cloud storage, enabling seamless file-based workflows.
74
75
```python { .api }
76
def download_files_from_urls(
77
job_id: str,
78
urls: list
79
) -> list:
80
"""
81
Download files from URLs to local storage in job directory.
82
83
Parameters:
84
- job_id: Job identifier for organizing downloaded files
85
- urls: Single URL string or list of URLs to download
86
87
Returns:
88
list: List of local file paths for downloaded files
89
"""
90
91
def upload_file_to_bucket(
92
file_name: str,
93
file_location: str,
94
bucket_creds: dict = None,
95
bucket_name: str = None,
96
prefix: str = None,
97
extra_args: dict = None
98
) -> str:
99
"""
100
Upload file to cloud storage bucket.
101
102
Parameters:
103
- file_name: Name for the uploaded file
104
- file_location: Local path to file to upload
105
- bucket_creds: S3-compatible bucket credentials
106
- bucket_name: Target bucket name
107
- prefix: Optional prefix for the object key
108
- extra_args: Additional upload arguments
109
110
Returns:
111
str: Public URL of uploaded file
112
"""
113
114
def upload_in_memory_object(
115
file_name: str,
116
file_data: bytes,
117
bucket_creds: dict = None,
118
bucket_name: str = None,
119
prefix: str = None
120
) -> str:
121
"""
122
Upload in-memory data directly to cloud storage.
123
124
Parameters:
125
- file_name: Name for the uploaded file
126
- file_data: Binary data to upload
127
- bucket_creds: S3-compatible bucket credentials
128
- bucket_name: Target bucket name
129
- prefix: Optional prefix for the object key
130
131
Returns:
132
str: Public URL of uploaded file
133
"""
134
```
135
136
### Logging Utilities
137
138
Specialized logging system optimized for serverless environments with structured output and integration with RunPod monitoring.
139
140
```python { .api }
141
class RunPodLogger:
142
"""Logger class optimized for RunPod serverless environments."""
143
144
def __init__(self):
145
"""Initialize RunPod logger with default configuration."""
146
147
def debug(self, message: str, **kwargs) -> None:
148
"""Log debug message with optional structured data."""
149
150
def info(self, message: str, **kwargs) -> None:
151
"""Log info message with optional structured data."""
152
153
def warn(self, message: str, **kwargs) -> None:
154
"""Log warning message with optional structured data."""
155
156
def error(self, message: str, **kwargs) -> None:
157
"""Log error message with optional structured data."""
158
```
159
160
## Usage Examples
161
162
### Basic Serverless Worker
163
164
```python
165
import runpod
166
167
def my_handler(job):
168
"""Process a job and return results."""
169
# Get job input
170
job_input = job["input"]
171
job_id = job["id"]
172
173
# Extract parameters
174
prompt = job_input.get("prompt", "")
175
steps = job_input.get("steps", 50)
176
177
# Send progress update
178
runpod.serverless.progress_update(job_id, 10, "Starting generation")
179
180
# Your processing logic here
181
# (e.g., run AI model, process data, etc.)
182
result_data = process_request(prompt, steps)
183
184
# Send completion progress
185
runpod.serverless.progress_update(job_id, 100, "Generation complete")
186
187
# Return results
188
return {
189
"output": result_data,
190
"status": "completed"
191
}
192
193
def process_request(prompt, steps):
194
"""Your actual processing logic."""
195
# Simulate processing
196
import time
197
time.sleep(2)
198
return f"Generated content for: {prompt} with {steps} steps"
199
200
# Start the serverless worker
201
if __name__ == "__main__":
202
runpod.serverless.start({
203
"handler": my_handler,
204
"return_aggregate_stream": True,
205
"rp_log_level": "INFO"
206
})
207
```
208
209
### Advanced Worker with File Processing
210
211
```python
212
import runpod
213
import os
214
import json
215
from PIL import Image
216
217
def image_processing_handler(job):
218
"""Process images with progress reporting and file handling."""
219
job_input = job["input"]
220
job_id = job["id"]
221
222
# Download input images
223
input_urls = job_input.get("image_urls", [])
224
runpod.serverless.progress_update(job_id, 5, "Downloading input images")
225
226
# Download files
227
downloaded_files = runpod.serverless.utils.download_files_from_urls(
228
job_id,
229
input_urls
230
)
231
232
runpod.serverless.progress_update(job_id, 20, "Processing images")
233
234
processed_files = []
235
total_files = len(downloaded_files)
236
237
for i, file_path in enumerate(downloaded_files):
238
# Process each image
239
processed_path = process_image(file_path, job_input)
240
processed_files.append(processed_path)
241
242
# Update progress
243
progress = 20 + (60 * (i + 1) / total_files)
244
runpod.serverless.progress_update(
245
job_id,
246
int(progress),
247
f"Processed {i+1}/{total_files} images"
248
)
249
250
# Upload results
251
runpod.serverless.progress_update(job_id, 85, "Uploading results")
252
253
output_urls = []
254
for file_path in processed_files:
255
url = runpod.serverless.utils.upload_file_to_bucket(
256
file_name=os.path.basename(file_path),
257
file_location=file_path
258
)
259
output_urls.append(url)
260
261
runpod.serverless.progress_update(job_id, 100, "Processing complete")
262
263
return {
264
"output_urls": output_urls,
265
"processed_count": len(output_urls)
266
}
267
268
def process_image(input_path, params):
269
"""Process a single image file."""
270
# Load image
271
image = Image.open(input_path)
272
273
# Apply processing (example: resize)
274
width = params.get("width", 512)
275
height = params.get("height", 512)
276
processed_image = image.resize((width, height))
277
278
# Save processed image
279
output_path = input_path.replace("/inputs/", "/outputs/").replace(".jpg", "_processed.jpg")
280
os.makedirs(os.path.dirname(output_path), exist_ok=True)
281
processed_image.save(output_path, "JPEG", quality=95)
282
283
return output_path
284
285
# Start worker
286
if __name__ == "__main__":
287
runpod.serverless.start({
288
"handler": image_processing_handler,
289
"return_aggregate_stream": True,
290
"rp_log_level": "DEBUG"
291
})
292
```
293
294
### Streaming Response Worker
295
296
```python
297
import runpod
298
import time
299
300
def streaming_handler(job):
301
"""Generate streaming responses for real-time output."""
302
job_input = job["input"]
303
job_id = job["id"]
304
305
prompt = job_input.get("prompt", "")
306
max_tokens = job_input.get("max_tokens", 100)
307
308
# Initialize streaming response
309
generated_text = ""
310
311
# Simulate streaming text generation
312
words = prompt.split() + ["generated", "response", "with", "streaming", "output"]
313
314
for i, word in enumerate(words[:max_tokens]):
315
# Add word to output
316
generated_text += word + " "
317
318
# Yield intermediate result for streaming
319
yield {
320
"text": word + " ",
321
"partial_text": generated_text,
322
"progress": int((i + 1) / len(words) * 100),
323
"tokens_generated": i + 1
324
}
325
326
# Simulate processing time
327
time.sleep(0.1)
328
329
# Final result
330
yield {
331
"text": generated_text.strip(),
332
"completed": True,
333
"total_tokens": len(words)
334
}
335
336
# Start streaming worker
337
if __name__ == "__main__":
338
runpod.serverless.start({
339
"handler": streaming_handler,
340
"return_aggregate_stream": True # Important for streaming
341
})
342
```
343
344
### Error Handling and Logging
345
346
```python
347
import runpod
348
import traceback
349
350
# Initialize logger
351
logger = runpod.RunPodLogger()
352
353
def robust_handler(job):
354
"""Handler with comprehensive error handling."""
355
job_input = job["input"]
356
job_id = job["id"]
357
358
try:
359
logger.info("Starting job processing", job_id=job_id)
360
361
# Validate input
362
if not validate_input(job_input):
363
logger.error("Invalid input provided", input=job_input)
364
return {
365
"error": "Invalid input parameters",
366
"status": "failed"
367
}
368
369
# Process with progress updates
370
runpod.serverless.progress_update(job_id, 10, "Input validated")
371
372
result = perform_complex_processing(job_input, job_id)
373
374
logger.info("Job completed successfully", job_id=job_id)
375
return {
376
"output": result,
377
"status": "completed"
378
}
379
380
except ValueError as e:
381
logger.error("Validation error", error=str(e), job_id=job_id)
382
return {
383
"error": f"Validation error: {str(e)}",
384
"status": "failed"
385
}
386
387
except Exception as e:
388
logger.error(
389
"Unexpected error during processing",
390
error=str(e),
391
traceback=traceback.format_exc(),
392
job_id=job_id
393
)
394
return {
395
"error": "Internal processing error",
396
"status": "failed"
397
}
398
399
def validate_input(job_input):
400
"""Validate job input parameters."""
401
required_fields = ["prompt", "model"]
402
for field in required_fields:
403
if field not in job_input:
404
return False
405
return True
406
407
def perform_complex_processing(job_input, job_id):
408
"""Simulate complex processing with progress updates."""
409
steps = ["preprocessing", "model_loading", "inference", "postprocessing"]
410
411
for i, step in enumerate(steps):
412
logger.debug(f"Executing {step}", job_id=job_id)
413
414
# Simulate processing time
415
time.sleep(1)
416
417
# Update progress
418
progress = int((i + 1) / len(steps) * 90) # Leave 10% for final steps
419
runpod.serverless.progress_update(job_id, progress, f"Completed {step}")
420
421
return {"result": "Processing completed successfully"}
422
423
# Start worker with error handling
424
if __name__ == "__main__":
425
runpod.serverless.start({
426
"handler": robust_handler,
427
"return_aggregate_stream": True,
428
"rp_log_level": "DEBUG",
429
"refresh_worker": True # Auto-restart on errors
430
})
431
```
432
433
### Environment Configuration
434
435
```python
436
import runpod
437
import os
438
439
def configure_environment():
440
"""Set up environment for serverless worker."""
441
# Set up GPU memory if available
442
if "CUDA_VISIBLE_DEVICES" in os.environ:
443
import torch
444
torch.cuda.empty_cache()
445
446
# Configure logging
447
import logging
448
logging.basicConfig(level=logging.INFO)
449
450
return True
451
452
def production_handler(job):
453
"""Production-ready handler with full configuration."""
454
# Environment setup
455
if not configure_environment():
456
return {"error": "Environment setup failed"}
457
458
# Process job
459
return process_job(job)
460
461
def process_job(job):
462
"""Main job processing logic."""
463
# Your implementation here
464
return {"status": "completed"}
465
466
# Production serverless worker
467
if __name__ == "__main__":
468
runpod.serverless.start({
469
"handler": production_handler,
470
"return_aggregate_stream": True,
471
"rp_log_level": "INFO",
472
"refresh_worker": True,
473
"rp_debugger": False # Disable debug mode in production
474
})
475
```