0
# Batch Processing
1
2
Utilities for processing AWS SQS, DynamoDB Streams, and Kinesis records with built-in error handling, partial failure support, and automatic retries. Enables reliable processing of batch events with granular control over success and failure handling.
3
4
## Capabilities
5
6
### Batch Processor Decorators
7
8
Decorators that automatically handle batch processing logic and error handling for Lambda functions.
9
10
```python { .api }
11
def batch_processor(
12
record_handler: Callable[[Dict], Any],
13
processor: BatchProcessor,
14
context: LambdaContext = None,
15
) -> Callable:
16
"""
17
Decorator for synchronous batch processing of Lambda events.
18
19
Parameters:
20
- record_handler: Function to process individual records
21
- processor: BatchProcessor instance with configuration
22
- context: Lambda context (automatically injected if not provided)
23
24
Returns:
25
Decorated Lambda function that processes batches
26
"""
27
28
def async_batch_processor(
29
record_handler: Callable[[Dict], Awaitable[Any]],
30
processor: AsyncBatchProcessor,
31
context: LambdaContext = None,
32
) -> Callable:
33
"""
34
Decorator for asynchronous batch processing of Lambda events.
35
36
Parameters:
37
- record_handler: Async function to process individual records
38
- processor: AsyncBatchProcessor instance
39
- context: Lambda context (automatically injected if not provided)
40
41
Returns:
42
Decorated async Lambda function that processes batches
43
"""
44
45
def process_partial_response(
46
record_handler: Callable[[Dict], Any],
47
processor: BasePartialBatchProcessor,
48
context: LambdaContext = None,
49
) -> Callable:
50
"""
51
Decorator for batch processing with partial failure handling.
52
53
Parameters:
54
- record_handler: Function to process individual records
55
- processor: Partial batch processor instance
56
- context: Lambda context
57
58
Returns:
59
Decorated function that returns partial batch failure response
60
"""
61
62
def async_process_partial_response(
63
record_handler: Callable[[Dict], Awaitable[Any]],
64
processor: BasePartialBatchProcessor,
65
context: LambdaContext = None,
66
) -> Callable:
67
"""
68
Decorator for async batch processing with partial failure handling.
69
70
Parameters:
71
- record_handler: Async function to process individual records
72
- processor: Async partial batch processor instance
73
- context: Lambda context
74
75
Returns:
76
Decorated async function with partial failure response
77
"""
78
```
79
80
### Batch Processors
81
82
Core processor classes that handle the batch processing logic and error management.
83
84
```python { .api }
85
class BatchProcessor:
86
def __init__(
87
self,
88
event_type: EventType,
89
model: BaseModel = None,
90
batch_length_quota_mb: int = 6,
91
):
92
"""
93
Synchronous batch processor for AWS event sources.
94
95
Parameters:
96
- event_type: Type of AWS event (SQS, KinesisDataStreams, DynamoDBStreams)
97
- model: Pydantic model for record validation
98
- batch_length_quota_mb: Maximum batch size in MB
99
"""
100
101
def process(
102
self,
103
event: Dict[str, Any],
104
record_handler: Callable[[Dict], Any],
105
context: LambdaContext = None,
106
) -> List[SuccessResponse]:
107
"""
108
Process batch of records.
109
110
Parameters:
111
- event: Lambda event containing records
112
- record_handler: Function to process each record
113
- context: Lambda runtime context
114
115
Returns:
116
List of successful processing responses
117
"""
118
119
def success_handler(
120
self,
121
record: Dict[str, Any],
122
result: Any,
123
) -> SuccessResponse:
124
"""
125
Handle successful record processing.
126
127
Parameters:
128
- record: Successfully processed record
129
- result: Processing result
130
131
Returns:
132
SuccessResponse object
133
"""
134
135
def failure_handler(
136
self,
137
record: Dict[str, Any],
138
exception: Exception,
139
) -> FailureResponse:
140
"""
141
Handle failed record processing.
142
143
Parameters:
144
- record: Failed record
145
- exception: Exception that occurred
146
147
Returns:
148
FailureResponse object
149
"""
150
151
class AsyncBatchProcessor:
152
def __init__(
153
self,
154
event_type: EventType,
155
model: BaseModel = None,
156
batch_length_quota_mb: int = 6,
157
):
158
"""
159
Asynchronous batch processor for AWS event sources.
160
161
Parameters:
162
- event_type: Type of AWS event
163
- model: Pydantic model for validation
164
- batch_length_quota_mb: Maximum batch size in MB
165
"""
166
167
async def process(
168
self,
169
event: Dict[str, Any],
170
record_handler: Callable[[Dict], Awaitable[Any]],
171
context: LambdaContext = None,
172
) -> List[SuccessResponse]:
173
"""
174
Asynchronously process batch of records.
175
176
Parameters:
177
- event: Lambda event containing records
178
- record_handler: Async function to process each record
179
- context: Lambda runtime context
180
181
Returns:
182
List of successful processing responses
183
"""
184
185
async def success_handler(
186
self,
187
record: Dict[str, Any],
188
result: Any,
189
) -> SuccessResponse:
190
"""Handle successful async record processing"""
191
192
async def failure_handler(
193
self,
194
record: Dict[str, Any],
195
exception: Exception,
196
) -> FailureResponse:
197
"""Handle failed async record processing"""
198
```
199
200
### Partial Batch Processors
201
202
Processors that support partial batch failure responses for improved error handling.
203
204
```python { .api }
205
class BasePartialBatchProcessor:
206
def __init__(
207
self,
208
event_type: EventType,
209
model: BaseModel = None,
210
):
211
"""
212
Base class for partial batch processing.
213
214
Parameters:
215
- event_type: Type of AWS event
216
- model: Pydantic model for validation
217
"""
218
219
def process(
220
self,
221
event: Dict[str, Any],
222
record_handler: Callable[[Dict], Any],
223
context: LambdaContext = None,
224
) -> Dict[str, Any]:
225
"""
226
Process batch with partial failure support.
227
228
Parameters:
229
- event: Lambda event containing records
230
- record_handler: Function to process each record
231
- context: Lambda runtime context
232
233
Returns:
234
Partial batch failure response dictionary
235
"""
236
237
class BasePartialProcessor:
238
def __init__(
239
self,
240
event_type: EventType,
241
model: BaseModel = None,
242
):
243
"""
244
Base partial processor implementation.
245
246
Parameters:
247
- event_type: Type of AWS event
248
- model: Pydantic model for validation
249
"""
250
251
def process(
252
self,
253
event: Dict[str, Any],
254
record_handler: Callable[[Dict], Any],
255
context: LambdaContext = None,
256
) -> Dict[str, Any]:
257
"""Process records with partial failure handling"""
258
259
class SqsFifoPartialProcessor:
260
def __init__(
261
self,
262
model: BaseModel = None,
263
):
264
"""
265
SQS FIFO queue partial processor with message group handling.
266
267
Parameters:
268
- model: Pydantic model for SQS message validation
269
"""
270
271
def process(
272
self,
273
event: Dict[str, Any],
274
record_handler: Callable[[Dict], Any],
275
context: LambdaContext = None,
276
) -> Dict[str, Any]:
277
"""
278
Process SQS FIFO messages with message group ordering.
279
280
Stops processing a message group on first failure to maintain
281
FIFO ordering guarantees within the group.
282
"""
283
```
284
285
### Response Classes
286
287
Response objects for indicating processing success or failure.
288
289
```python { .api }
290
class SuccessResponse:
291
def __init__(self, **kwargs):
292
"""
293
Represents successful record processing.
294
295
Parameters:
296
- **kwargs: Additional success metadata
297
"""
298
299
class FailureResponse:
300
def __init__(self, **kwargs):
301
"""
302
Represents failed record processing.
303
304
Parameters:
305
- **kwargs: Additional failure metadata
306
"""
307
308
class ExceptionInfo:
309
def __init__(self, exception: Exception, record: Dict[str, Any]):
310
"""
311
Exception information for failed record processing.
312
313
Parameters:
314
- exception: The exception that occurred
315
- record: The record that failed processing
316
"""
317
self.exception = exception
318
self.record = record
319
```
320
321
### Event Types and Models
322
323
Constants and type definitions for supported AWS event sources.
324
325
```python { .api }
326
class EventType:
327
SQS = "SQS"
328
KinesisDataStreams = "KinesisDataStreams"
329
DynamoDBStreams = "DynamoDBStreams"
330
331
class BatchTypeModels:
332
"""Type models for different batch event sources"""
333
334
@classmethod
335
def get_sqs_model(cls) -> BaseModel:
336
"""Get Pydantic model for SQS records"""
337
338
@classmethod
339
def get_kinesis_model(cls) -> BaseModel:
340
"""Get Pydantic model for Kinesis records"""
341
342
@classmethod
343
def get_dynamodb_model(cls) -> BaseModel:
344
"""Get Pydantic model for DynamoDB Stream records"""
345
```
346
347
## Usage Examples
348
349
### Basic SQS Batch Processing
350
351
```python
352
from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
353
from aws_lambda_powertools.utilities.typing import LambdaContext
354
355
# Initialize processor for SQS events
356
processor = BatchProcessor(event_type="SQS")
357
358
def record_handler(record: dict) -> dict:
359
"""Process individual SQS message"""
360
# Extract message body
361
body = record["body"]
362
363
# Process message (could raise exception)
364
if "error" in body:
365
raise ValueError("Invalid message content")
366
367
# Return processing result
368
return {"processed": True, "message_id": record["messageId"]}
369
370
@batch_processor(record_handler=record_handler, processor=processor)
371
def lambda_handler(event: dict, context: LambdaContext):
372
# Batch processor handles the event automatically
373
# Returns list of SuccessResponse objects
374
return {"statusCode": 200}
375
```
376
377
### Partial Failure Handling for SQS
378
379
```python
380
from aws_lambda_powertools.utilities.batch import (
381
process_partial_response,
382
BasePartialBatchProcessor
383
)
384
from aws_lambda_powertools.utilities.typing import LambdaContext
385
import json
386
387
# Processor that supports partial failures
388
processor = BasePartialBatchProcessor(event_type="SQS")
389
390
def record_handler(record: dict) -> dict:
391
"""Process SQS message with potential failure"""
392
try:
393
# Parse message body
394
message = json.loads(record["body"])
395
396
# Validate required fields
397
if not message.get("user_id"):
398
raise ValueError("Missing user_id")
399
400
# Process message
401
process_user_action(message)
402
403
return {"status": "success", "user_id": message["user_id"]}
404
405
except json.JSONDecodeError:
406
# This will cause the record to be marked as failed
407
raise ValueError("Invalid JSON in message body")
408
except Exception as e:
409
# Any unhandled exception marks record as failed
410
raise
411
412
@process_partial_response(record_handler=record_handler, processor=processor)
413
def lambda_handler(event: dict, context: LambdaContext):
414
# Returns partial batch failure response
415
# Failed messages will be retried by SQS
416
pass
417
418
def process_user_action(message: dict):
419
"""Business logic that might fail"""
420
# Simulate processing that might fail
421
if message.get("action") == "delete_account":
422
# This is a critical operation - fail if conditions not met
423
if not message.get("confirmation_token"):
424
raise ValueError("Confirmation token required for account deletion")
425
```
426
427
### Async Batch Processing for Kinesis
428
429
```python
430
from aws_lambda_powertools.utilities.batch import (
431
async_batch_processor,
432
AsyncBatchProcessor
433
)
434
from aws_lambda_powertools.utilities.typing import LambdaContext
435
import asyncio
436
import aiohttp
437
import json
438
439
# Async processor for Kinesis streams
440
processor = AsyncBatchProcessor(event_type="KinesisDataStreams")
441
442
async def record_handler(record: dict) -> dict:
443
"""Async processing of Kinesis record"""
444
# Decode Kinesis data
445
import base64
446
data = json.loads(base64.b64decode(record["kinesis"]["data"]))
447
448
# Make async HTTP call to external service
449
async with aiohttp.ClientSession() as session:
450
async with session.post(
451
"https://api.example.com/process",
452
json=data,
453
timeout=aiohttp.ClientTimeout(total=10)
454
) as response:
455
if response.status != 200:
456
raise ValueError(f"API call failed: {response.status}")
457
458
result = await response.json()
459
return result
460
461
@async_batch_processor(record_handler=record_handler, processor=processor)
462
async def lambda_handler(event: dict, context: LambdaContext):
463
# Async batch processing with concurrent record handling
464
return {"statusCode": 200}
465
```
466
467
### DynamoDB Streams Processing
468
469
```python
470
from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
471
from aws_lambda_powertools.utilities.typing import LambdaContext
472
import boto3
473
474
# Processor for DynamoDB Stream events
475
processor = BatchProcessor(event_type="DynamoDBStreams")
476
dynamodb = boto3.resource("dynamodb")
477
478
def record_handler(record: dict) -> dict:
479
"""Process DynamoDB Stream record"""
480
# Extract change information
481
event_name = record["eventName"] # INSERT, MODIFY, REMOVE
482
483
if event_name == "INSERT":
484
return handle_insert(record)
485
elif event_name == "MODIFY":
486
return handle_modify(record)
487
elif event_name == "REMOVE":
488
return handle_remove(record)
489
490
return {"status": "ignored", "event": event_name}
491
492
def handle_insert(record: dict) -> dict:
493
"""Handle new item insertion"""
494
new_image = record["dynamodb"].get("NewImage", {})
495
496
# Extract item data (DynamoDB format)
497
item_id = new_image.get("id", {}).get("S")
498
item_type = new_image.get("type", {}).get("S")
499
500
# Trigger downstream processing
501
if item_type == "user":
502
send_welcome_email(item_id)
503
elif item_type == "order":
504
update_inventory(new_image)
505
506
return {"processed": "insert", "item_id": item_id}
507
508
def handle_modify(record: dict) -> dict:
509
"""Handle item modification"""
510
old_image = record["dynamodb"].get("OldImage", {})
511
new_image = record["dynamodb"].get("NewImage", {})
512
513
# Compare old and new values to determine what changed
514
changes = detect_changes(old_image, new_image)
515
516
# Handle specific field changes
517
if "status" in changes:
518
handle_status_change(changes["status"], new_image)
519
520
return {"processed": "modify", "changes": list(changes.keys())}
521
522
def handle_remove(record: dict) -> dict:
523
"""Handle item removal"""
524
old_image = record["dynamodb"].get("OldImage", {})
525
526
# Cleanup related resources
527
item_id = old_image.get("id", {}).get("S")
528
cleanup_item_resources(item_id)
529
530
return {"processed": "remove", "item_id": item_id}
531
532
@batch_processor(record_handler=record_handler, processor=processor)
533
def lambda_handler(event: dict, context: LambdaContext):
534
return {"statusCode": 200}
535
```
536
537
### SQS FIFO with Message Group Handling
538
539
```python
540
from aws_lambda_powertools.utilities.batch import (
541
process_partial_response,
542
SqsFifoPartialProcessor
543
)
544
from aws_lambda_powertools.utilities.typing import LambdaContext
545
import json
546
547
# FIFO processor maintains message group ordering
548
processor = SqsFifoPartialProcessor()
549
550
def record_handler(record: dict) -> dict:
551
"""Process FIFO SQS message maintaining group order"""
552
message = json.loads(record["body"])
553
group_id = record["attributes"]["MessageGroupId"]
554
555
# Process message in order within group
556
result = process_ordered_message(message, group_id)
557
558
# If this fails, subsequent messages in same group won't be processed
559
# This maintains FIFO ordering guarantees
560
if not result["success"]:
561
raise ValueError(f"Processing failed for group {group_id}")
562
563
return result
564
565
@process_partial_response(record_handler=record_handler, processor=processor)
566
def lambda_handler(event: dict, context: LambdaContext):
567
# FIFO processor will stop processing a message group on first failure
568
# This preserves ordering within each message group
569
pass
570
571
def process_ordered_message(message: dict, group_id: str) -> dict:
572
"""Process message maintaining order within group"""
573
# Example: Sequential account transactions
574
if message["type"] == "account_transaction":
575
account_id = message["account_id"]
576
577
# Ensure transactions are processed in order
578
current_balance = get_account_balance(account_id)
579
new_balance = current_balance + message["amount"]
580
581
if new_balance < 0 and message["amount"] < 0:
582
# Insufficient funds - fail this and subsequent transactions
583
return {"success": False, "reason": "insufficient_funds"}
584
585
update_account_balance(account_id, new_balance)
586
return {"success": True, "new_balance": new_balance}
587
588
return {"success": True, "processed": message["type"]}
589
```
590
591
### Custom Error Handling
592
593
```python
594
from aws_lambda_powertools.utilities.batch import BatchProcessor
595
from aws_lambda_powertools.utilities.typing import LambdaContext
596
597
class CustomBatchProcessor(BatchProcessor):
598
"""Custom processor with specialized error handling"""
599
600
def __init__(self, event_type: str, **kwargs):
601
super().__init__(event_type, **kwargs)
602
self.retry_count = 0
603
self.max_retries = 3
604
605
def failure_handler(self, record: dict, exception: Exception):
606
"""Custom failure handling with retry logic"""
607
# Log detailed error information
608
error_details = {
609
"record_id": record.get("messageId", "unknown"),
610
"error_type": type(exception).__name__,
611
"error_message": str(exception),
612
"retry_count": self.retry_count
613
}
614
615
# Determine if error is retryable
616
if isinstance(exception, (ConnectionError, TimeoutError)):
617
if self.retry_count < self.max_retries:
618
self.retry_count += 1
619
# Return success to prevent DLQ, handle retry externally
620
return super().success_handler(record, {"retried": True})
621
622
# Log to monitoring system
623
log_processing_failure(error_details)
624
625
# Return failure response
626
return super().failure_handler(record, exception)
627
628
# Use custom processor
629
processor = CustomBatchProcessor(event_type="SQS")
630
631
def record_handler(record: dict) -> dict:
632
"""Record handler with retry-aware processing"""
633
# Check if this is a retry attempt
634
attributes = record.get("attributes", {})
635
approximate_receive_count = int(attributes.get("ApproximateReceiveCount", "1"))
636
637
if approximate_receive_count > 1:
638
# This is a retry - handle accordingly
639
print(f"Processing retry attempt #{approximate_receive_count}")
640
641
# Process record (might raise retryable exception)
642
return process_with_external_service(record)
643
644
def process_with_external_service(record: dict) -> dict:
645
"""Simulate processing that might need retries"""
646
import requests
647
648
try:
649
response = requests.post(
650
"https://api.example.com/process",
651
json={"data": record["body"]},
652
timeout=10
653
)
654
response.raise_for_status()
655
return response.json()
656
657
except requests.ConnectionError:
658
# Retryable error
659
raise ConnectionError("Failed to connect to external service")
660
except requests.Timeout:
661
# Retryable error
662
raise TimeoutError("Request to external service timed out")
663
except requests.HTTPError as e:
664
if e.response.status_code >= 500:
665
# Server error - retryable
666
raise ConnectionError(f"Server error: {e.response.status_code}")
667
else:
668
# Client error - not retryable
669
raise ValueError(f"Invalid request: {e.response.status_code}")
670
```
671
672
## Types
673
674
```python { .api }
675
from typing import Dict, Any, List, Union, Callable, Awaitable, Optional
676
from aws_lambda_powertools.utilities.typing import LambdaContext
677
678
# Event type constants
679
EventType = Literal["SQS", "KinesisDataStreams", "DynamoDBStreams"]
680
681
# Handler function signatures
682
RecordHandler = Callable[[Dict[str, Any]], Any]
683
AsyncRecordHandler = Callable[[Dict[str, Any]], Awaitable[Any]]
684
685
# Processor response types
686
ProcessorResponse = Union[List[SuccessResponse], Dict[str, Any]]
687
688
# Batch processing configuration
689
class BatchConfig:
690
def __init__(
691
self,
692
max_records: int = None,
693
batch_size_mb: int = 6,
694
parallel_processing: bool = False,
695
max_concurrency: int = 10,
696
):
697
"""
698
Configuration for batch processing behavior.
699
700
Parameters:
701
- max_records: Maximum number of records to process
702
- batch_size_mb: Maximum batch size in megabytes
703
- parallel_processing: Whether to process records in parallel
704
- max_concurrency: Maximum concurrent processing for async
705
"""
706
```