or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdcore-observability.mddata-classes.mdevent-handlers.mdfeature-flags.mdindex.mdparameters.mdparser.mdutilities.md

batch-processing.mddocs/

0

# Batch Processing

1

2

Utilities for processing AWS SQS, DynamoDB Streams, and Kinesis records with built-in error handling, partial failure support, and automatic retries. Enables reliable processing of batch events with granular control over success and failure handling.

3

4

## Capabilities

5

6

### Batch Processor Decorators

7

8

Decorators that automatically handle batch processing logic and error handling for Lambda functions.

9

10

```python { .api }

11

def batch_processor(

12

record_handler: Callable[[Dict], Any],

13

processor: BatchProcessor,

14

context: LambdaContext = None,

15

) -> Callable:

16

"""

17

Decorator for synchronous batch processing of Lambda events.

18

19

Parameters:

20

- record_handler: Function to process individual records

21

- processor: BatchProcessor instance with configuration

22

- context: Lambda context (automatically injected if not provided)

23

24

Returns:

25

Decorated Lambda function that processes batches

26

"""

27

28

def async_batch_processor(

29

record_handler: Callable[[Dict], Awaitable[Any]],

30

processor: AsyncBatchProcessor,

31

context: LambdaContext = None,

32

) -> Callable:

33

"""

34

Decorator for asynchronous batch processing of Lambda events.

35

36

Parameters:

37

- record_handler: Async function to process individual records

38

- processor: AsyncBatchProcessor instance

39

- context: Lambda context (automatically injected if not provided)

40

41

Returns:

42

Decorated async Lambda function that processes batches

43

"""

44

45

def process_partial_response(

46

record_handler: Callable[[Dict], Any],

47

processor: BasePartialBatchProcessor,

48

context: LambdaContext = None,

49

) -> Callable:

50

"""

51

Decorator for batch processing with partial failure handling.

52

53

Parameters:

54

- record_handler: Function to process individual records

55

- processor: Partial batch processor instance

56

- context: Lambda context

57

58

Returns:

59

Decorated function that returns partial batch failure response

60

"""

61

62

def async_process_partial_response(

63

record_handler: Callable[[Dict], Awaitable[Any]],

64

processor: BasePartialBatchProcessor,

65

context: LambdaContext = None,

66

) -> Callable:

67

"""

68

Decorator for async batch processing with partial failure handling.

69

70

Parameters:

71

- record_handler: Async function to process individual records

72

- processor: Async partial batch processor instance

73

- context: Lambda context

74

75

Returns:

76

Decorated async function with partial failure response

77

"""

78

```

79

80

### Batch Processors

81

82

Core processor classes that handle the batch processing logic and error management.

83

84

```python { .api }

85

class BatchProcessor:

86

def __init__(

87

self,

88

event_type: EventType,

89

model: BaseModel = None,

90

batch_length_quota_mb: int = 6,

91

):

92

"""

93

Synchronous batch processor for AWS event sources.

94

95

Parameters:

96

- event_type: Type of AWS event (SQS, KinesisDataStreams, DynamoDBStreams)

97

- model: Pydantic model for record validation

98

- batch_length_quota_mb: Maximum batch size in MB

99

"""

100

101

def process(

102

self,

103

event: Dict[str, Any],

104

record_handler: Callable[[Dict], Any],

105

context: LambdaContext = None,

106

) -> List[SuccessResponse]:

107

"""

108

Process batch of records.

109

110

Parameters:

111

- event: Lambda event containing records

112

- record_handler: Function to process each record

113

- context: Lambda runtime context

114

115

Returns:

116

List of successful processing responses

117

"""

118

119

def success_handler(

120

self,

121

record: Dict[str, Any],

122

result: Any,

123

) -> SuccessResponse:

124

"""

125

Handle successful record processing.

126

127

Parameters:

128

- record: Successfully processed record

129

- result: Processing result

130

131

Returns:

132

SuccessResponse object

133

"""

134

135

def failure_handler(

136

self,

137

record: Dict[str, Any],

138

exception: Exception,

139

) -> FailureResponse:

140

"""

141

Handle failed record processing.

142

143

Parameters:

144

- record: Failed record

145

- exception: Exception that occurred

146

147

Returns:

148

FailureResponse object

149

"""

150

151

class AsyncBatchProcessor:

152

def __init__(

153

self,

154

event_type: EventType,

155

model: BaseModel = None,

156

batch_length_quota_mb: int = 6,

157

):

158

"""

159

Asynchronous batch processor for AWS event sources.

160

161

Parameters:

162

- event_type: Type of AWS event

163

- model: Pydantic model for validation

164

- batch_length_quota_mb: Maximum batch size in MB

165

"""

166

167

async def process(

168

self,

169

event: Dict[str, Any],

170

record_handler: Callable[[Dict], Awaitable[Any]],

171

context: LambdaContext = None,

172

) -> List[SuccessResponse]:

173

"""

174

Asynchronously process batch of records.

175

176

Parameters:

177

- event: Lambda event containing records

178

- record_handler: Async function to process each record

179

- context: Lambda runtime context

180

181

Returns:

182

List of successful processing responses

183

"""

184

185

async def success_handler(

186

self,

187

record: Dict[str, Any],

188

result: Any,

189

) -> SuccessResponse:

190

"""Handle successful async record processing"""

191

192

async def failure_handler(

193

self,

194

record: Dict[str, Any],

195

exception: Exception,

196

) -> FailureResponse:

197

"""Handle failed async record processing"""

198

```

199

200

### Partial Batch Processors

201

202

Processors that support partial batch failure responses for improved error handling.

203

204

```python { .api }

205

class BasePartialBatchProcessor:

206

def __init__(

207

self,

208

event_type: EventType,

209

model: BaseModel = None,

210

):

211

"""

212

Base class for partial batch processing.

213

214

Parameters:

215

- event_type: Type of AWS event

216

- model: Pydantic model for validation

217

"""

218

219

def process(

220

self,

221

event: Dict[str, Any],

222

record_handler: Callable[[Dict], Any],

223

context: LambdaContext = None,

224

) -> Dict[str, Any]:

225

"""

226

Process batch with partial failure support.

227

228

Parameters:

229

- event: Lambda event containing records

230

- record_handler: Function to process each record

231

- context: Lambda runtime context

232

233

Returns:

234

Partial batch failure response dictionary

235

"""

236

237

class BasePartialProcessor:

238

def __init__(

239

self,

240

event_type: EventType,

241

model: BaseModel = None,

242

):

243

"""

244

Base partial processor implementation.

245

246

Parameters:

247

- event_type: Type of AWS event

248

- model: Pydantic model for validation

249

"""

250

251

def process(

252

self,

253

event: Dict[str, Any],

254

record_handler: Callable[[Dict], Any],

255

context: LambdaContext = None,

256

) -> Dict[str, Any]:

257

"""Process records with partial failure handling"""

258

259

class SqsFifoPartialProcessor:

260

def __init__(

261

self,

262

model: BaseModel = None,

263

):

264

"""

265

SQS FIFO queue partial processor with message group handling.

266

267

Parameters:

268

- model: Pydantic model for SQS message validation

269

"""

270

271

def process(

272

self,

273

event: Dict[str, Any],

274

record_handler: Callable[[Dict], Any],

275

context: LambdaContext = None,

276

) -> Dict[str, Any]:

277

"""

278

Process SQS FIFO messages with message group ordering.

279

280

Stops processing a message group on first failure to maintain

281

FIFO ordering guarantees within the group.

282

"""

283

```

284

285

### Response Classes

286

287

Response objects for indicating processing success or failure.

288

289

```python { .api }

290

class SuccessResponse:

291

def __init__(self, **kwargs):

292

"""

293

Represents successful record processing.

294

295

Parameters:

296

- **kwargs: Additional success metadata

297

"""

298

299

class FailureResponse:

300

def __init__(self, **kwargs):

301

"""

302

Represents failed record processing.

303

304

Parameters:

305

- **kwargs: Additional failure metadata

306

"""

307

308

class ExceptionInfo:

309

def __init__(self, exception: Exception, record: Dict[str, Any]):

310

"""

311

Exception information for failed record processing.

312

313

Parameters:

314

- exception: The exception that occurred

315

- record: The record that failed processing

316

"""

317

self.exception = exception

318

self.record = record

319

```

320

321

### Event Types and Models

322

323

Constants and type definitions for supported AWS event sources.

324

325

```python { .api }

326

class EventType:

327

SQS = "SQS"

328

KinesisDataStreams = "KinesisDataStreams"

329

DynamoDBStreams = "DynamoDBStreams"

330

331

class BatchTypeModels:

332

"""Type models for different batch event sources"""

333

334

@classmethod

335

def get_sqs_model(cls) -> BaseModel:

336

"""Get Pydantic model for SQS records"""

337

338

@classmethod

339

def get_kinesis_model(cls) -> BaseModel:

340

"""Get Pydantic model for Kinesis records"""

341

342

@classmethod

343

def get_dynamodb_model(cls) -> BaseModel:

344

"""Get Pydantic model for DynamoDB Stream records"""

345

```

346

347

## Usage Examples

348

349

### Basic SQS Batch Processing

350

351

```python

352

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor

353

from aws_lambda_powertools.utilities.typing import LambdaContext

354

355

# Initialize processor for SQS events

356

processor = BatchProcessor(event_type="SQS")

357

358

def record_handler(record: dict) -> dict:

359

"""Process individual SQS message"""

360

# Extract message body

361

body = record["body"]

362

363

# Process message (could raise exception)

364

if "error" in body:

365

raise ValueError("Invalid message content")

366

367

# Return processing result

368

return {"processed": True, "message_id": record["messageId"]}

369

370

@batch_processor(record_handler=record_handler, processor=processor)

371

def lambda_handler(event: dict, context: LambdaContext):

372

# Batch processor handles the event automatically

373

# Returns list of SuccessResponse objects

374

return {"statusCode": 200}

375

```

376

377

### Partial Failure Handling for SQS

378

379

```python

380

from aws_lambda_powertools.utilities.batch import (

381

process_partial_response,

382

BasePartialBatchProcessor

383

)

384

from aws_lambda_powertools.utilities.typing import LambdaContext

385

import json

386

387

# Processor that supports partial failures

388

processor = BasePartialBatchProcessor(event_type="SQS")

389

390

def record_handler(record: dict) -> dict:

391

"""Process SQS message with potential failure"""

392

try:

393

# Parse message body

394

message = json.loads(record["body"])

395

396

# Validate required fields

397

if not message.get("user_id"):

398

raise ValueError("Missing user_id")

399

400

# Process message

401

process_user_action(message)

402

403

return {"status": "success", "user_id": message["user_id"]}

404

405

except json.JSONDecodeError:

406

# This will cause the record to be marked as failed

407

raise ValueError("Invalid JSON in message body")

408

except Exception as e:

409

# Any unhandled exception marks record as failed

410

raise

411

412

@process_partial_response(record_handler=record_handler, processor=processor)

413

def lambda_handler(event: dict, context: LambdaContext):

414

# Returns partial batch failure response

415

# Failed messages will be retried by SQS

416

pass

417

418

def process_user_action(message: dict):

419

"""Business logic that might fail"""

420

# Simulate processing that might fail

421

if message.get("action") == "delete_account":

422

# This is a critical operation - fail if conditions not met

423

if not message.get("confirmation_token"):

424

raise ValueError("Confirmation token required for account deletion")

425

```

426

427

### Async Batch Processing for Kinesis

428

429

```python

430

from aws_lambda_powertools.utilities.batch import (

431

async_batch_processor,

432

AsyncBatchProcessor

433

)

434

from aws_lambda_powertools.utilities.typing import LambdaContext

435

import asyncio

436

import aiohttp

437

import json

438

439

# Async processor for Kinesis streams

440

processor = AsyncBatchProcessor(event_type="KinesisDataStreams")

441

442

async def record_handler(record: dict) -> dict:

443

"""Async processing of Kinesis record"""

444

# Decode Kinesis data

445

import base64

446

data = json.loads(base64.b64decode(record["kinesis"]["data"]))

447

448

# Make async HTTP call to external service

449

async with aiohttp.ClientSession() as session:

450

async with session.post(

451

"https://api.example.com/process",

452

json=data,

453

timeout=aiohttp.ClientTimeout(total=10)

454

) as response:

455

if response.status != 200:

456

raise ValueError(f"API call failed: {response.status}")

457

458

result = await response.json()

459

return result

460

461

@async_batch_processor(record_handler=record_handler, processor=processor)

462

async def lambda_handler(event: dict, context: LambdaContext):

463

# Async batch processing with concurrent record handling

464

return {"statusCode": 200}

465

```

466

467

### DynamoDB Streams Processing

468

469

```python

470

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor

471

from aws_lambda_powertools.utilities.typing import LambdaContext

472

import boto3

473

474

# Processor for DynamoDB Stream events

475

processor = BatchProcessor(event_type="DynamoDBStreams")

476

dynamodb = boto3.resource("dynamodb")

477

478

def record_handler(record: dict) -> dict:

479

"""Process DynamoDB Stream record"""

480

# Extract change information

481

event_name = record["eventName"] # INSERT, MODIFY, REMOVE

482

483

if event_name == "INSERT":

484

return handle_insert(record)

485

elif event_name == "MODIFY":

486

return handle_modify(record)

487

elif event_name == "REMOVE":

488

return handle_remove(record)

489

490

return {"status": "ignored", "event": event_name}

491

492

def handle_insert(record: dict) -> dict:

493

"""Handle new item insertion"""

494

new_image = record["dynamodb"].get("NewImage", {})

495

496

# Extract item data (DynamoDB format)

497

item_id = new_image.get("id", {}).get("S")

498

item_type = new_image.get("type", {}).get("S")

499

500

# Trigger downstream processing

501

if item_type == "user":

502

send_welcome_email(item_id)

503

elif item_type == "order":

504

update_inventory(new_image)

505

506

return {"processed": "insert", "item_id": item_id}

507

508

def handle_modify(record: dict) -> dict:

509

"""Handle item modification"""

510

old_image = record["dynamodb"].get("OldImage", {})

511

new_image = record["dynamodb"].get("NewImage", {})

512

513

# Compare old and new values to determine what changed

514

changes = detect_changes(old_image, new_image)

515

516

# Handle specific field changes

517

if "status" in changes:

518

handle_status_change(changes["status"], new_image)

519

520

return {"processed": "modify", "changes": list(changes.keys())}

521

522

def handle_remove(record: dict) -> dict:

523

"""Handle item removal"""

524

old_image = record["dynamodb"].get("OldImage", {})

525

526

# Cleanup related resources

527

item_id = old_image.get("id", {}).get("S")

528

cleanup_item_resources(item_id)

529

530

return {"processed": "remove", "item_id": item_id}

531

532

@batch_processor(record_handler=record_handler, processor=processor)

533

def lambda_handler(event: dict, context: LambdaContext):

534

return {"statusCode": 200}

535

```

536

537

### SQS FIFO with Message Group Handling

538

539

```python

540

from aws_lambda_powertools.utilities.batch import (

541

process_partial_response,

542

SqsFifoPartialProcessor

543

)

544

from aws_lambda_powertools.utilities.typing import LambdaContext

545

import json

546

547

# FIFO processor maintains message group ordering

548

processor = SqsFifoPartialProcessor()

549

550

def record_handler(record: dict) -> dict:

551

"""Process FIFO SQS message maintaining group order"""

552

message = json.loads(record["body"])

553

group_id = record["attributes"]["MessageGroupId"]

554

555

# Process message in order within group

556

result = process_ordered_message(message, group_id)

557

558

# If this fails, subsequent messages in same group won't be processed

559

# This maintains FIFO ordering guarantees

560

if not result["success"]:

561

raise ValueError(f"Processing failed for group {group_id}")

562

563

return result

564

565

@process_partial_response(record_handler=record_handler, processor=processor)

566

def lambda_handler(event: dict, context: LambdaContext):

567

# FIFO processor will stop processing a message group on first failure

568

# This preserves ordering within each message group

569

pass

570

571

def process_ordered_message(message: dict, group_id: str) -> dict:

572

"""Process message maintaining order within group"""

573

# Example: Sequential account transactions

574

if message["type"] == "account_transaction":

575

account_id = message["account_id"]

576

577

# Ensure transactions are processed in order

578

current_balance = get_account_balance(account_id)

579

new_balance = current_balance + message["amount"]

580

581

if new_balance < 0 and message["amount"] < 0:

582

# Insufficient funds - fail this and subsequent transactions

583

return {"success": False, "reason": "insufficient_funds"}

584

585

update_account_balance(account_id, new_balance)

586

return {"success": True, "new_balance": new_balance}

587

588

return {"success": True, "processed": message["type"]}

589

```

590

591

### Custom Error Handling

592

593

```python

594

from aws_lambda_powertools.utilities.batch import BatchProcessor

595

from aws_lambda_powertools.utilities.typing import LambdaContext

596

597

class CustomBatchProcessor(BatchProcessor):

598

"""Custom processor with specialized error handling"""

599

600

def __init__(self, event_type: str, **kwargs):

601

super().__init__(event_type, **kwargs)

602

self.retry_count = 0

603

self.max_retries = 3

604

605

def failure_handler(self, record: dict, exception: Exception):

606

"""Custom failure handling with retry logic"""

607

# Log detailed error information

608

error_details = {

609

"record_id": record.get("messageId", "unknown"),

610

"error_type": type(exception).__name__,

611

"error_message": str(exception),

612

"retry_count": self.retry_count

613

}

614

615

# Determine if error is retryable

616

if isinstance(exception, (ConnectionError, TimeoutError)):

617

if self.retry_count < self.max_retries:

618

self.retry_count += 1

619

# Return success to prevent DLQ, handle retry externally

620

return super().success_handler(record, {"retried": True})

621

622

# Log to monitoring system

623

log_processing_failure(error_details)

624

625

# Return failure response

626

return super().failure_handler(record, exception)

627

628

# Use custom processor

629

processor = CustomBatchProcessor(event_type="SQS")

630

631

def record_handler(record: dict) -> dict:

632

"""Record handler with retry-aware processing"""

633

# Check if this is a retry attempt

634

attributes = record.get("attributes", {})

635

approximate_receive_count = int(attributes.get("ApproximateReceiveCount", "1"))

636

637

if approximate_receive_count > 1:

638

# This is a retry - handle accordingly

639

print(f"Processing retry attempt #{approximate_receive_count}")

640

641

# Process record (might raise retryable exception)

642

return process_with_external_service(record)

643

644

def process_with_external_service(record: dict) -> dict:

645

"""Simulate processing that might need retries"""

646

import requests

647

648

try:

649

response = requests.post(

650

"https://api.example.com/process",

651

json={"data": record["body"]},

652

timeout=10

653

)

654

response.raise_for_status()

655

return response.json()

656

657

except requests.ConnectionError:

658

# Retryable error

659

raise ConnectionError("Failed to connect to external service")

660

except requests.Timeout:

661

# Retryable error

662

raise TimeoutError("Request to external service timed out")

663

except requests.HTTPError as e:

664

if e.response.status_code >= 500:

665

# Server error - retryable

666

raise ConnectionError(f"Server error: {e.response.status_code}")

667

else:

668

# Client error - not retryable

669

raise ValueError(f"Invalid request: {e.response.status_code}")

670

```

671

672

## Types

673

674

```python { .api }

675

from typing import Dict, Any, List, Union, Callable, Awaitable, Optional

676

from aws_lambda_powertools.utilities.typing import LambdaContext

677

678

# Event type constants

679

EventType = Literal["SQS", "KinesisDataStreams", "DynamoDBStreams"]

680

681

# Handler function signatures

682

RecordHandler = Callable[[Dict[str, Any]], Any]

683

AsyncRecordHandler = Callable[[Dict[str, Any]], Awaitable[Any]]

684

685

# Processor response types

686

ProcessorResponse = Union[List[SuccessResponse], Dict[str, Any]]

687

688

# Batch processing configuration

689

class BatchConfig:

690

def __init__(

691

self,

692

max_records: int = None,

693

batch_size_mb: int = 6,

694

parallel_processing: bool = False,

695

max_concurrency: int = 10,

696

):

697

"""

698

Configuration for batch processing behavior.

699

700

Parameters:

701

- max_records: Maximum number of records to process

702

- batch_size_mb: Maximum batch size in megabytes

703

- parallel_processing: Whether to process records in parallel

704

- max_concurrency: Maximum concurrent processing for async

705

"""

706

```