CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aws-lambda-powertools

Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python

89

1.21x
Overview
Eval results
Files

batch-processing.mddocs/

Batch Processing

Utilities for processing AWS SQS, DynamoDB Streams, and Kinesis records with built-in error handling, partial failure support, and automatic retries. Enables reliable processing of batch events with granular control over success and failure handling.

Capabilities

Batch Processor Decorators

Decorators that automatically handle batch processing logic and error handling for Lambda functions.

def batch_processor(
    record_handler: Callable[[Dict], Any],
    processor: BatchProcessor,
    context: LambdaContext = None,
) -> Callable:
    """
    Decorator for synchronous batch processing of Lambda events.
    
    Parameters:
    - record_handler: Function to process individual records
    - processor: BatchProcessor instance with configuration
    - context: Lambda context (automatically injected if not provided)
    
    Returns:
    Decorated Lambda function that processes batches
    """

def async_batch_processor(
    record_handler: Callable[[Dict], Awaitable[Any]],
    processor: AsyncBatchProcessor,
    context: LambdaContext = None,
) -> Callable:
    """
    Decorator for asynchronous batch processing of Lambda events.
    
    Parameters:
    - record_handler: Async function to process individual records
    - processor: AsyncBatchProcessor instance
    - context: Lambda context (automatically injected if not provided)
    
    Returns:
    Decorated async Lambda function that processes batches
    """

def process_partial_response(
    record_handler: Callable[[Dict], Any],
    processor: BasePartialBatchProcessor,
    context: LambdaContext = None,
) -> Callable:
    """
    Decorator for batch processing with partial failure handling.
    
    Parameters:
    - record_handler: Function to process individual records
    - processor: Partial batch processor instance
    - context: Lambda context
    
    Returns:
    Decorated function that returns partial batch failure response
    """

def async_process_partial_response(
    record_handler: Callable[[Dict], Awaitable[Any]],
    processor: BasePartialBatchProcessor,
    context: LambdaContext = None,
) -> Callable:
    """
    Decorator for async batch processing with partial failure handling.
    
    Parameters:
    - record_handler: Async function to process individual records
    - processor: Async partial batch processor instance
    - context: Lambda context
    
    Returns:
    Decorated async function with partial failure response
    """

Batch Processors

Core processor classes that handle the batch processing logic and error management.

class BatchProcessor:
    def __init__(
        self,
        event_type: EventType,
        model: BaseModel = None,
        batch_length_quota_mb: int = 6,
    ):
        """
        Synchronous batch processor for AWS event sources.
        
        Parameters:
        - event_type: Type of AWS event (SQS, KinesisDataStreams, DynamoDBStreams)
        - model: Pydantic model for record validation
        - batch_length_quota_mb: Maximum batch size in MB
        """

    def process(
        self,
        event: Dict[str, Any],
        record_handler: Callable[[Dict], Any],
        context: LambdaContext = None,
    ) -> List[SuccessResponse]:
        """
        Process batch of records.
        
        Parameters:
        - event: Lambda event containing records
        - record_handler: Function to process each record
        - context: Lambda runtime context
        
        Returns:
        List of successful processing responses
        """

    def success_handler(
        self,
        record: Dict[str, Any],
        result: Any,
    ) -> SuccessResponse:
        """
        Handle successful record processing.
        
        Parameters:
        - record: Successfully processed record
        - result: Processing result
        
        Returns:
        SuccessResponse object
        """

    def failure_handler(
        self,
        record: Dict[str, Any],
        exception: Exception,
    ) -> FailureResponse:
        """
        Handle failed record processing.
        
        Parameters:
        - record: Failed record
        - exception: Exception that occurred
        
        Returns:
        FailureResponse object
        """

class AsyncBatchProcessor:
    def __init__(
        self,
        event_type: EventType,
        model: BaseModel = None,
        batch_length_quota_mb: int = 6,
    ):
        """
        Asynchronous batch processor for AWS event sources.
        
        Parameters:
        - event_type: Type of AWS event
        - model: Pydantic model for validation
        - batch_length_quota_mb: Maximum batch size in MB
        """

    async def process(
        self,
        event: Dict[str, Any],
        record_handler: Callable[[Dict], Awaitable[Any]],
        context: LambdaContext = None,
    ) -> List[SuccessResponse]:
        """
        Asynchronously process batch of records.
        
        Parameters:
        - event: Lambda event containing records
        - record_handler: Async function to process each record
        - context: Lambda runtime context
        
        Returns:
        List of successful processing responses
        """

    async def success_handler(
        self,
        record: Dict[str, Any],
        result: Any,
    ) -> SuccessResponse:
        """Handle successful async record processing"""

    async def failure_handler(
        self,
        record: Dict[str, Any],
        exception: Exception,
    ) -> FailureResponse:
        """Handle failed async record processing"""

Partial Batch Processors

Processors that support partial batch failure responses for improved error handling.

class BasePartialBatchProcessor:
    def __init__(
        self,
        event_type: EventType,
        model: BaseModel = None,
    ):
        """
        Base class for partial batch processing.
        
        Parameters:
        - event_type: Type of AWS event
        - model: Pydantic model for validation
        """

    def process(
        self,
        event: Dict[str, Any],
        record_handler: Callable[[Dict], Any],
        context: LambdaContext = None,
    ) -> Dict[str, Any]:
        """
        Process batch with partial failure support.
        
        Parameters:
        - event: Lambda event containing records
        - record_handler: Function to process each record
        - context: Lambda runtime context
        
        Returns:
        Partial batch failure response dictionary
        """

class BasePartialProcessor:
    def __init__(
        self,
        event_type: EventType,
        model: BaseModel = None,
    ):
        """
        Base partial processor implementation.
        
        Parameters:
        - event_type: Type of AWS event
        - model: Pydantic model for validation
        """

    def process(
        self,
        event: Dict[str, Any],
        record_handler: Callable[[Dict], Any],
        context: LambdaContext = None,
    ) -> Dict[str, Any]:
        """Process records with partial failure handling"""

class SqsFifoPartialProcessor:
    def __init__(
        self,
        model: BaseModel = None,
    ):
        """
        SQS FIFO queue partial processor with message group handling.
        
        Parameters:
        - model: Pydantic model for SQS message validation
        """

    def process(
        self,
        event: Dict[str, Any],
        record_handler: Callable[[Dict], Any],
        context: LambdaContext = None,
    ) -> Dict[str, Any]:
        """
        Process SQS FIFO messages with message group ordering.
        
        Stops processing a message group on first failure to maintain
        FIFO ordering guarantees within the group.
        """

Response Classes

Response objects for indicating processing success or failure.

class SuccessResponse:
    def __init__(self, **kwargs):
        """
        Represents successful record processing.
        
        Parameters:
        - **kwargs: Additional success metadata
        """

class FailureResponse:
    def __init__(self, **kwargs):
        """
        Represents failed record processing.
        
        Parameters:
        - **kwargs: Additional failure metadata
        """

class ExceptionInfo:
    def __init__(self, exception: Exception, record: Dict[str, Any]):
        """
        Exception information for failed record processing.
        
        Parameters:
        - exception: The exception that occurred
        - record: The record that failed processing
        """
        self.exception = exception
        self.record = record

Event Types and Models

Constants and type definitions for supported AWS event sources.

class EventType:
    SQS = "SQS"
    KinesisDataStreams = "KinesisDataStreams"
    DynamoDBStreams = "DynamoDBStreams"

class BatchTypeModels:
    """Type models for different batch event sources"""
    
    @classmethod
    def get_sqs_model(cls) -> BaseModel:
        """Get Pydantic model for SQS records"""
        
    @classmethod
    def get_kinesis_model(cls) -> BaseModel:
        """Get Pydantic model for Kinesis records"""
        
    @classmethod
    def get_dynamodb_model(cls) -> BaseModel:
        """Get Pydantic model for DynamoDB Stream records"""

Usage Examples

Basic SQS Batch Processing

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
from aws_lambda_powertools.utilities.typing import LambdaContext

# Initialize processor for SQS events
processor = BatchProcessor(event_type="SQS")

def record_handler(record: dict) -> dict:
    """Process individual SQS message"""
    # Extract message body
    body = record["body"]
    
    # Process message (could raise exception)
    if "error" in body:
        raise ValueError("Invalid message content")
    
    # Return processing result
    return {"processed": True, "message_id": record["messageId"]}

@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event: dict, context: LambdaContext):
    # Batch processor handles the event automatically
    # Returns list of SuccessResponse objects
    return {"statusCode": 200}

Partial Failure Handling for SQS

from aws_lambda_powertools.utilities.batch import (
    process_partial_response,
    BasePartialBatchProcessor
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

# Processor that supports partial failures
processor = BasePartialBatchProcessor(event_type="SQS")

def record_handler(record: dict) -> dict:
    """Process SQS message with potential failure"""
    try:
        # Parse message body
        message = json.loads(record["body"])
        
        # Validate required fields
        if not message.get("user_id"):
            raise ValueError("Missing user_id")
            
        # Process message
        process_user_action(message)
        
        return {"status": "success", "user_id": message["user_id"]}
        
    except json.JSONDecodeError:
        # This will cause the record to be marked as failed
        raise ValueError("Invalid JSON in message body")
    except Exception as e:
        # Any unhandled exception marks record as failed
        raise

@process_partial_response(record_handler=record_handler, processor=processor)
def lambda_handler(event: dict, context: LambdaContext):
    # Returns partial batch failure response
    # Failed messages will be retried by SQS
    pass

def process_user_action(message: dict):
    """Business logic that might fail"""
    # Simulate processing that might fail
    if message.get("action") == "delete_account":
        # This is a critical operation - fail if conditions not met
        if not message.get("confirmation_token"):
            raise ValueError("Confirmation token required for account deletion")

Async Batch Processing for Kinesis

from aws_lambda_powertools.utilities.batch import (
    async_batch_processor,
    AsyncBatchProcessor
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import asyncio
import aiohttp
import json

# Async processor for Kinesis streams
processor = AsyncBatchProcessor(event_type="KinesisDataStreams")

async def record_handler(record: dict) -> dict:
    """Async processing of Kinesis record"""
    # Decode Kinesis data
    import base64
    data = json.loads(base64.b64decode(record["kinesis"]["data"]))
    
    # Make async HTTP call to external service
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/process",
            json=data,
            timeout=aiohttp.ClientTimeout(total=10)
        ) as response:
            if response.status != 200:
                raise ValueError(f"API call failed: {response.status}")
            
            result = await response.json()
            return result

@async_batch_processor(record_handler=record_handler, processor=processor)
async def lambda_handler(event: dict, context: LambdaContext):
    # Async batch processing with concurrent record handling
    return {"statusCode": 200}

DynamoDB Streams Processing

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
from aws_lambda_powertools.utilities.typing import LambdaContext
import boto3

# Processor for DynamoDB Stream events
processor = BatchProcessor(event_type="DynamoDBStreams")
dynamodb = boto3.resource("dynamodb")

def record_handler(record: dict) -> dict:
    """Process DynamoDB Stream record"""
    # Extract change information
    event_name = record["eventName"]  # INSERT, MODIFY, REMOVE
    
    if event_name == "INSERT":
        return handle_insert(record)
    elif event_name == "MODIFY":
        return handle_modify(record)
    elif event_name == "REMOVE":
        return handle_remove(record)
    
    return {"status": "ignored", "event": event_name}

def handle_insert(record: dict) -> dict:
    """Handle new item insertion"""
    new_image = record["dynamodb"].get("NewImage", {})
    
    # Extract item data (DynamoDB format)
    item_id = new_image.get("id", {}).get("S")
    item_type = new_image.get("type", {}).get("S")
    
    # Trigger downstream processing
    if item_type == "user":
        send_welcome_email(item_id)
    elif item_type == "order":
        update_inventory(new_image)
    
    return {"processed": "insert", "item_id": item_id}

def handle_modify(record: dict) -> dict:
    """Handle item modification"""
    old_image = record["dynamodb"].get("OldImage", {})
    new_image = record["dynamodb"].get("NewImage", {})
    
    # Compare old and new values to determine what changed
    changes = detect_changes(old_image, new_image)
    
    # Handle specific field changes
    if "status" in changes:
        handle_status_change(changes["status"], new_image)
    
    return {"processed": "modify", "changes": list(changes.keys())}

def handle_remove(record: dict) -> dict:
    """Handle item removal"""
    old_image = record["dynamodb"].get("OldImage", {})
    
    # Cleanup related resources
    item_id = old_image.get("id", {}).get("S")
    cleanup_item_resources(item_id)
    
    return {"processed": "remove", "item_id": item_id}

@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event: dict, context: LambdaContext):
    return {"statusCode": 200}

SQS FIFO with Message Group Handling

from aws_lambda_powertools.utilities.batch import (
    process_partial_response,
    SqsFifoPartialProcessor
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

# FIFO processor maintains message group ordering
processor = SqsFifoPartialProcessor()

def record_handler(record: dict) -> dict:
    """Process FIFO SQS message maintaining group order"""
    message = json.loads(record["body"])
    group_id = record["attributes"]["MessageGroupId"]
    
    # Process message in order within group
    result = process_ordered_message(message, group_id)
    
    # If this fails, subsequent messages in same group won't be processed
    # This maintains FIFO ordering guarantees
    if not result["success"]:
        raise ValueError(f"Processing failed for group {group_id}")
    
    return result

@process_partial_response(record_handler=record_handler, processor=processor)
def lambda_handler(event: dict, context: LambdaContext):
    # FIFO processor will stop processing a message group on first failure
    # This preserves ordering within each message group
    pass

def process_ordered_message(message: dict, group_id: str) -> dict:
    """Process message maintaining order within group"""
    # Example: Sequential account transactions
    if message["type"] == "account_transaction":
        account_id = message["account_id"]
        
        # Ensure transactions are processed in order
        current_balance = get_account_balance(account_id)
        new_balance = current_balance + message["amount"]
        
        if new_balance < 0 and message["amount"] < 0:
            # Insufficient funds - fail this and subsequent transactions
            return {"success": False, "reason": "insufficient_funds"}
        
        update_account_balance(account_id, new_balance)
        return {"success": True, "new_balance": new_balance}
    
    return {"success": True, "processed": message["type"]}

Custom Error Handling

from aws_lambda_powertools.utilities.batch import BatchProcessor
from aws_lambda_powertools.utilities.typing import LambdaContext

class CustomBatchProcessor(BatchProcessor):
    """Custom processor with specialized error handling"""
    
    def __init__(self, event_type: str, **kwargs):
        super().__init__(event_type, **kwargs)
        self.retry_count = 0
        self.max_retries = 3
    
    def failure_handler(self, record: dict, exception: Exception):
        """Custom failure handling with retry logic"""
        # Log detailed error information
        error_details = {
            "record_id": record.get("messageId", "unknown"),
            "error_type": type(exception).__name__,
            "error_message": str(exception),
            "retry_count": self.retry_count
        }
        
        # Determine if error is retryable
        if isinstance(exception, (ConnectionError, TimeoutError)):
            if self.retry_count < self.max_retries:
                self.retry_count += 1
                # Return success to prevent DLQ, handle retry externally
                return super().success_handler(record, {"retried": True})
        
        # Log to monitoring system
        log_processing_failure(error_details)
        
        # Return failure response
        return super().failure_handler(record, exception)

# Use custom processor
processor = CustomBatchProcessor(event_type="SQS")

def record_handler(record: dict) -> dict:
    """Record handler with retry-aware processing"""
    # Check if this is a retry attempt
    attributes = record.get("attributes", {})
    approximate_receive_count = int(attributes.get("ApproximateReceiveCount", "1"))
    
    if approximate_receive_count > 1:
        # This is a retry - handle accordingly
        print(f"Processing retry attempt #{approximate_receive_count}")
    
    # Process record (might raise retryable exception)
    return process_with_external_service(record)

def process_with_external_service(record: dict) -> dict:
    """Simulate processing that might need retries"""
    import requests
    
    try:
        response = requests.post(
            "https://api.example.com/process",
            json={"data": record["body"]},
            timeout=10
        )
        response.raise_for_status()
        return response.json()
        
    except requests.ConnectionError:
        # Retryable error
        raise ConnectionError("Failed to connect to external service")
    except requests.Timeout:
        # Retryable error
        raise TimeoutError("Request to external service timed out")
    except requests.HTTPError as e:
        if e.response.status_code >= 500:
            # Server error - retryable
            raise ConnectionError(f"Server error: {e.response.status_code}")
        else:
            # Client error - not retryable
            raise ValueError(f"Invalid request: {e.response.status_code}")

Types

from typing import Dict, Any, List, Union, Callable, Awaitable, Optional
from aws_lambda_powertools.utilities.typing import LambdaContext

# Event type constants
EventType = Literal["SQS", "KinesisDataStreams", "DynamoDBStreams"]

# Handler function signatures
RecordHandler = Callable[[Dict[str, Any]], Any]
AsyncRecordHandler = Callable[[Dict[str, Any]], Awaitable[Any]]

# Processor response types
ProcessorResponse = Union[List[SuccessResponse], Dict[str, Any]]

# Batch processing configuration
class BatchConfig:
    def __init__(
        self,
        max_records: int = None,
        batch_size_mb: int = 6,
        parallel_processing: bool = False,
        max_concurrency: int = 10,
    ):
        """
        Configuration for batch processing behavior.
        
        Parameters:
        - max_records: Maximum number of records to process
        - batch_size_mb: Maximum batch size in megabytes
        - parallel_processing: Whether to process records in parallel
        - max_concurrency: Maximum concurrent processing for async
        """

Install with Tessl CLI

npx tessl i tessl/pypi-aws-lambda-powertools

docs

batch-processing.md

core-observability.md

data-classes.md

event-handlers.md

feature-flags.md

index.md

parameters.md

parser.md

utilities.md

tile.json