or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdendpoint-management.mdindex.mdpod-management.mdserverless-worker.md

serverless-worker.mddocs/

0

# Serverless Worker SDK

1

2

Comprehensive framework for building and deploying serverless workers that process jobs from RunPod endpoints. Includes job processing, progress reporting, error handling, and file transfer utilities for building production-ready serverless applications.

3

4

## Core Imports

5

6

```python

7

import runpod

8

import runpod.serverless

9

from runpod.serverless import start, progress_update

10

from runpod.serverless.utils import download_files_from_urls, upload_file_to_bucket, upload_in_memory_object

11

from runpod import RunPodLogger

12

```

13

14

## Capabilities

15

16

### Worker Framework

17

18

Core serverless worker framework that handles job processing, automatic scaling, and communication with the RunPod platform.

19

20

```python { .api }

21

def start(config: dict) -> None:

22

"""

23

Start the serverless worker with the provided configuration.

24

25

Parameters:

26

- config: Worker configuration dictionary containing:

27

- handler: Function to process jobs (required)

28

- return_aggregate_stream: Whether to aggregate streaming responses

29

- rp_log_level: Logging level ("DEBUG", "INFO", "WARNING", "ERROR")

30

- rp_debugger: Enable debug mode

31

- refresh_worker: Auto-refresh worker on errors

32

33

The handler function should accept a job dictionary and return results.

34

Handler signature: handler(job: dict) -> dict

35

36

Example config:

37

{

38

"handler": my_job_handler,

39

"return_aggregate_stream": True,

40

"rp_log_level": "INFO"

41

}

42

"""

43

```

44

45

### Progress Reporting

46

47

Real-time progress reporting system for long-running jobs to provide status updates to clients.

48

49

```python { .api }

50

def progress_update(

51

job_id: str,

52

progress: int,

53

status: str = None,

54

**kwargs

55

) -> None:

56

"""

57

Send progress updates during job execution.

58

59

Parameters:

60

- job_id: Job identifier from the job input

61

- progress: Progress percentage (0-100)

62

- status: Optional status message

63

- **kwargs: Additional progress data (e.g., current_step, total_steps)

64

65

Example:

66

progress_update(job["id"], 25, "Processing input data")

67

progress_update(job["id"], 75, current_step=3, total_steps=4)

68

"""

69

```

70

71

### File Transfer Utilities

72

73

Utilities for downloading input files and uploading output files to cloud storage, enabling seamless file-based workflows.

74

75

```python { .api }

76

def download_files_from_urls(

77

job_id: str,

78

urls: list

79

) -> list:

80

"""

81

Download files from URLs to local storage in job directory.

82

83

Parameters:

84

- job_id: Job identifier for organizing downloaded files

85

- urls: Single URL string or list of URLs to download

86

87

Returns:

88

list: List of local file paths for downloaded files

89

"""

90

91

def upload_file_to_bucket(

92

file_name: str,

93

file_location: str,

94

bucket_creds: dict = None,

95

bucket_name: str = None,

96

prefix: str = None,

97

extra_args: dict = None

98

) -> str:

99

"""

100

Upload file to cloud storage bucket.

101

102

Parameters:

103

- file_name: Name for the uploaded file

104

- file_location: Local path to file to upload

105

- bucket_creds: S3-compatible bucket credentials

106

- bucket_name: Target bucket name

107

- prefix: Optional prefix for the object key

108

- extra_args: Additional upload arguments

109

110

Returns:

111

str: Public URL of uploaded file

112

"""

113

114

def upload_in_memory_object(

115

file_name: str,

116

file_data: bytes,

117

bucket_creds: dict = None,

118

bucket_name: str = None,

119

prefix: str = None

120

) -> str:

121

"""

122

Upload in-memory data directly to cloud storage.

123

124

Parameters:

125

- file_name: Name for the uploaded file

126

- file_data: Binary data to upload

127

- bucket_creds: S3-compatible bucket credentials

128

- bucket_name: Target bucket name

129

- prefix: Optional prefix for the object key

130

131

Returns:

132

str: Public URL of uploaded file

133

"""

134

```

135

136

### Logging Utilities

137

138

Specialized logging system optimized for serverless environments with structured output and integration with RunPod monitoring.

139

140

```python { .api }

141

class RunPodLogger:

142

"""Logger class optimized for RunPod serverless environments."""

143

144

def __init__(self):

145

"""Initialize RunPod logger with default configuration."""

146

147

def debug(self, message: str, **kwargs) -> None:

148

"""Log debug message with optional structured data."""

149

150

def info(self, message: str, **kwargs) -> None:

151

"""Log info message with optional structured data."""

152

153

def warn(self, message: str, **kwargs) -> None:

154

"""Log warning message with optional structured data."""

155

156

def error(self, message: str, **kwargs) -> None:

157

"""Log error message with optional structured data."""

158

```

159

160

## Usage Examples

161

162

### Basic Serverless Worker

163

164

```python

165

import runpod

166

167

def my_handler(job):

168

"""Process a job and return results."""

169

# Get job input

170

job_input = job["input"]

171

job_id = job["id"]

172

173

# Extract parameters

174

prompt = job_input.get("prompt", "")

175

steps = job_input.get("steps", 50)

176

177

# Send progress update

178

runpod.serverless.progress_update(job_id, 10, "Starting generation")

179

180

# Your processing logic here

181

# (e.g., run AI model, process data, etc.)

182

result_data = process_request(prompt, steps)

183

184

# Send completion progress

185

runpod.serverless.progress_update(job_id, 100, "Generation complete")

186

187

# Return results

188

return {

189

"output": result_data,

190

"status": "completed"

191

}

192

193

def process_request(prompt, steps):

194

"""Your actual processing logic."""

195

# Simulate processing

196

import time

197

time.sleep(2)

198

return f"Generated content for: {prompt} with {steps} steps"

199

200

# Start the serverless worker

201

if __name__ == "__main__":

202

runpod.serverless.start({

203

"handler": my_handler,

204

"return_aggregate_stream": True,

205

"rp_log_level": "INFO"

206

})

207

```

208

209

### Advanced Worker with File Processing

210

211

```python

212

import runpod

213

import os

214

import json

215

from PIL import Image

216

217

def image_processing_handler(job):

218

"""Process images with progress reporting and file handling."""

219

job_input = job["input"]

220

job_id = job["id"]

221

222

# Download input images

223

input_urls = job_input.get("image_urls", [])

224

runpod.serverless.progress_update(job_id, 5, "Downloading input images")

225

226

# Download files

227

downloaded_files = runpod.serverless.utils.download_files_from_urls(

228

job_id,

229

input_urls

230

)

231

232

runpod.serverless.progress_update(job_id, 20, "Processing images")

233

234

processed_files = []

235

total_files = len(downloaded_files)

236

237

for i, file_path in enumerate(downloaded_files):

238

# Process each image

239

processed_path = process_image(file_path, job_input)

240

processed_files.append(processed_path)

241

242

# Update progress

243

progress = 20 + (60 * (i + 1) / total_files)

244

runpod.serverless.progress_update(

245

job_id,

246

int(progress),

247

f"Processed {i+1}/{total_files} images"

248

)

249

250

# Upload results

251

runpod.serverless.progress_update(job_id, 85, "Uploading results")

252

253

output_urls = []

254

for file_path in processed_files:

255

url = runpod.serverless.utils.upload_file_to_bucket(

256

file_name=os.path.basename(file_path),

257

file_location=file_path

258

)

259

output_urls.append(url)

260

261

runpod.serverless.progress_update(job_id, 100, "Processing complete")

262

263

return {

264

"output_urls": output_urls,

265

"processed_count": len(output_urls)

266

}

267

268

def process_image(input_path, params):

269

"""Process a single image file."""

270

# Load image

271

image = Image.open(input_path)

272

273

# Apply processing (example: resize)

274

width = params.get("width", 512)

275

height = params.get("height", 512)

276

processed_image = image.resize((width, height))

277

278

# Save processed image

279

output_path = input_path.replace("/inputs/", "/outputs/").replace(".jpg", "_processed.jpg")

280

os.makedirs(os.path.dirname(output_path), exist_ok=True)

281

processed_image.save(output_path, "JPEG", quality=95)

282

283

return output_path

284

285

# Start worker

286

if __name__ == "__main__":

287

runpod.serverless.start({

288

"handler": image_processing_handler,

289

"return_aggregate_stream": True,

290

"rp_log_level": "DEBUG"

291

})

292

```

293

294

### Streaming Response Worker

295

296

```python

297

import runpod

298

import time

299

300

def streaming_handler(job):

301

"""Generate streaming responses for real-time output."""

302

job_input = job["input"]

303

job_id = job["id"]

304

305

prompt = job_input.get("prompt", "")

306

max_tokens = job_input.get("max_tokens", 100)

307

308

# Initialize streaming response

309

generated_text = ""

310

311

# Simulate streaming text generation

312

words = prompt.split() + ["generated", "response", "with", "streaming", "output"]

313

314

for i, word in enumerate(words[:max_tokens]):

315

# Add word to output

316

generated_text += word + " "

317

318

# Yield intermediate result for streaming

319

yield {

320

"text": word + " ",

321

"partial_text": generated_text,

322

"progress": int((i + 1) / len(words) * 100),

323

"tokens_generated": i + 1

324

}

325

326

# Simulate processing time

327

time.sleep(0.1)

328

329

# Final result

330

yield {

331

"text": generated_text.strip(),

332

"completed": True,

333

"total_tokens": len(words)

334

}

335

336

# Start streaming worker

337

if __name__ == "__main__":

338

runpod.serverless.start({

339

"handler": streaming_handler,

340

"return_aggregate_stream": True # Important for streaming

341

})

342

```

343

344

### Error Handling and Logging

345

346

```python

347

import runpod

348

import traceback

349

350

# Initialize logger

351

logger = runpod.RunPodLogger()

352

353

def robust_handler(job):

354

"""Handler with comprehensive error handling."""

355

job_input = job["input"]

356

job_id = job["id"]

357

358

try:

359

logger.info("Starting job processing", job_id=job_id)

360

361

# Validate input

362

if not validate_input(job_input):

363

logger.error("Invalid input provided", input=job_input)

364

return {

365

"error": "Invalid input parameters",

366

"status": "failed"

367

}

368

369

# Process with progress updates

370

runpod.serverless.progress_update(job_id, 10, "Input validated")

371

372

result = perform_complex_processing(job_input, job_id)

373

374

logger.info("Job completed successfully", job_id=job_id)

375

return {

376

"output": result,

377

"status": "completed"

378

}

379

380

except ValueError as e:

381

logger.error("Validation error", error=str(e), job_id=job_id)

382

return {

383

"error": f"Validation error: {str(e)}",

384

"status": "failed"

385

}

386

387

except Exception as e:

388

logger.error(

389

"Unexpected error during processing",

390

error=str(e),

391

traceback=traceback.format_exc(),

392

job_id=job_id

393

)

394

return {

395

"error": "Internal processing error",

396

"status": "failed"

397

}

398

399

def validate_input(job_input):

400

"""Validate job input parameters."""

401

required_fields = ["prompt", "model"]

402

for field in required_fields:

403

if field not in job_input:

404

return False

405

return True

406

407

def perform_complex_processing(job_input, job_id):

408

"""Simulate complex processing with progress updates."""

409

steps = ["preprocessing", "model_loading", "inference", "postprocessing"]

410

411

for i, step in enumerate(steps):

412

logger.debug(f"Executing {step}", job_id=job_id)

413

414

# Simulate processing time

415

time.sleep(1)

416

417

# Update progress

418

progress = int((i + 1) / len(steps) * 90) # Leave 10% for final steps

419

runpod.serverless.progress_update(job_id, progress, f"Completed {step}")

420

421

return {"result": "Processing completed successfully"}

422

423

# Start worker with error handling

424

if __name__ == "__main__":

425

runpod.serverless.start({

426

"handler": robust_handler,

427

"return_aggregate_stream": True,

428

"rp_log_level": "DEBUG",

429

"refresh_worker": True # Auto-restart on errors

430

})

431

```

432

433

### Environment Configuration

434

435

```python

436

import runpod

437

import os

438

439

def configure_environment():

440

"""Set up environment for serverless worker."""

441

# Set up GPU memory if available

442

if "CUDA_VISIBLE_DEVICES" in os.environ:

443

import torch

444

torch.cuda.empty_cache()

445

446

# Configure logging

447

import logging

448

logging.basicConfig(level=logging.INFO)

449

450

return True

451

452

def production_handler(job):

453

"""Production-ready handler with full configuration."""

454

# Environment setup

455

if not configure_environment():

456

return {"error": "Environment setup failed"}

457

458

# Process job

459

return process_job(job)

460

461

def process_job(job):

462

"""Main job processing logic."""

463

# Your implementation here

464

return {"status": "completed"}

465

466

# Production serverless worker

467

if __name__ == "__main__":

468

runpod.serverless.start({

469

"handler": production_handler,

470

"return_aggregate_stream": True,

471

"rp_log_level": "INFO",

472

"refresh_worker": True,

473

"rp_debugger": False # Disable debug mode in production

474

})

475

```