or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdcore-observability.mddata-classes.mdevent-handlers.mdfeature-flags.mdindex.mdparameters.mdparser.mdutilities.md
tile.json

tessl/pypi-aws-lambda-powertools

Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/aws-lambda-powertools@3.19.x

To install, run

npx @tessl/cli install tessl/pypi-aws-lambda-powertools@3.19.0

index.mddocs/

AWS Lambda Powertools Python

AWS Lambda Powertools Python is a comprehensive developer toolkit that implements serverless best practices for AWS Lambda functions. The package provides utilities for structured logging with Lambda context enrichment, distributed tracing, custom metrics, event handling, batch processing, parameter management, and many other serverless-specific features.

Package Information

  • Package Name: aws-lambda-powertools
  • Language: Python 3.9+
  • Installation: pip install aws-lambda-powertools

Core Imports

from aws_lambda_powertools import Logger, Metrics, Tracer

Specific utilities:

from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
from aws_lambda_powertools.utilities.parameters import get_parameter
from aws_lambda_powertools.utilities.batch import batch_processor

Basic Usage

from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

# Initialize core observability utilities
logger = Logger()
metrics = Metrics()
tracer = Tracer()

@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@metrics.log_metrics
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Add custom metrics
    metrics.add_metric(name="ProcessedEvents", unit="Count", value=1)
    
    # Structure logging with context
    logger.info("Processing event", extra={"event_type": "example"})
    
    # Add tracing annotations
    tracer.put_annotation(key="operation", value="process_event")
    
    return {"statusCode": 200, "body": "Success"}

Architecture

AWS Lambda Powertools follows a modular architecture with independent utilities:

  • Core Observability: Logger, Metrics, and Tracer work together to provide structured logging, CloudWatch metrics, and X-Ray tracing
  • Event Handlers: Framework-agnostic resolvers for API Gateway, ALB, AppSync, and other event sources
  • Utilities: Standalone components for batch processing, parameter management, data parsing, and validation
  • Data Classes: Type-safe representations of AWS event structures
  • Decorators: Python decorators for seamless integration with existing Lambda functions

Capabilities

Core Observability

Essential observability utilities including structured logging with Lambda context enrichment, CloudWatch embedded metric format (EMF) for custom metrics, and AWS X-Ray integration for distributed tracing.

class Logger:
    def __init__(
        self,
        service: str = None,
        level: str = "INFO",
        child: bool = False,
        sampling_rate: float = 0.0,
        stream: TextIO = None,
        logger_formatter: PowertoolsFormatter = None,
        logger_handler: logging.Handler = None,
        log_uncaught_exceptions: bool = False,
        json_serializer: Callable[[Dict], str] = None,
        json_deserializer: Callable[[Union[Dict, str, bool, int, float]], str] = None,
        json_default: Callable[[Any], Any] = None,
        datefmt: str = None,
        use_datetime_directive: bool = False,
        log_record_order: List[str] = None,
        utc: bool = False,
        use_rfc3339: bool = False,
    ): ...

class Metrics:
    def __init__(
        self,
        service: str = None,
        namespace: str = None,
        metadata: Dict[str, Any] = None,
        default_dimensions: Dict[str, str] = None,
    ): ...

class Tracer:
    def __init__(
        self,
        service: str = None,
        disabled: bool = False,
        auto_patch: bool = True,
        patch_modules: List[str] = None,
        provider: BaseProvider = None,
    ): ...

Core Observability

Event Handlers

Framework for building Lambda functions that handle HTTP events from API Gateway, Application Load Balancer, AppSync GraphQL, and other AWS services with automatic request/response serialization.

class APIGatewayRestResolver:
    def __init__(
        self,
        cors: CORSConfig = None,
        debug: bool = None,
        serializer: Callable[[Dict], str] = None,
        strip_prefixes: List[str] = None,
        enable_validation: bool = False,
    ): ...
    
    def get(self, rule: str, **kwargs): ...
    def post(self, rule: str, **kwargs): ...
    def put(self, rule: str, **kwargs): ...
    def delete(self, rule: str, **kwargs): ...

class ALBResolver:
    def __init__(
        self,
        cors: CORSConfig = None,
        debug: bool = None,
        serializer: Callable[[Dict], str] = None,
        strip_prefixes: List[str] = None,
        enable_validation: bool = False,
    ): ...

class AppSyncResolver:
    def __init__(self, debug: bool = None): ...
    def resolver(self, type_name: str = "*", field_name: str = None): ...

Event Handlers

Batch Processing

Utilities for processing AWS SQS, DynamoDB Streams, and Kinesis records with built-in error handling, partial failure support, and automatic retries.

def batch_processor(
    record_handler: Callable[[Dict], Any],
    processor: BatchProcessor,
    context: LambdaContext = None,
): ...

def async_batch_processor(
    record_handler: Callable[[Dict], Awaitable[Any]],
    processor: AsyncBatchProcessor,
    context: LambdaContext = None,
): ...

class BatchProcessor:
    def __init__(
        self,
        event_type: EventType,
        model: BaseModel = None,
        batch_length_quota_mb: int = 6,
    ): ...

Batch Processing

Data Classes

Type-safe data classes representing AWS event structures for Lambda triggers including API Gateway, S3, DynamoDB, Kinesis, SQS, SNS, and CloudWatch events.

class APIGatewayProxyEvent:
    @property
    def body(self) -> str | None: ...
    @property
    def json_body(self) -> Any: ...
    @property
    def headers(self) -> Dict[str, str]: ...
    @property
    def query_string_parameters(self) -> Dict[str, str] | None: ...

class SQSEvent:
    @property
    def records(self) -> List[SQSRecord]: ...

class DynamoDBStreamEvent:
    @property
    def records(self) -> List[DynamoDBRecord]: ...

Data Classes

Parameters

Retrieve and cache parameters from AWS Systems Manager Parameter Store, AWS Secrets Manager, and AWS AppConfig with automatic caching and transformation support.

def get_parameter(
    name: str,
    decrypt: bool = True,
    max_age: int = 5,
    transform: str = None,
    force_fetch: bool = False,
    **sdk_options,
) -> str: ...

def get_secret(
    name: str,
    version_id: str = None,
    version_stage: str = None,
    max_age: int = 5,
    transform: str = None,
    force_fetch: bool = False,
    **sdk_options,
) -> str: ...

def get_app_config(
    name: str,
    environment: str,
    application: str,
    max_age: int = 5,
    transform: str = None,
    force_fetch: bool = False,
    **sdk_options,
) -> bytes | str: ...

Parameters

Parser

Event parsing and validation using Pydantic models with built-in envelopes for extracting business logic from AWS event sources.

def event_parser(
    model: BaseModel,
    envelope: BaseEnvelope = None,
) -> Callable: ...

def parse(
    event: Dict[str, Any],
    model: BaseModel,
    envelope: BaseEnvelope = None,
) -> Any: ...

class BaseEnvelope:
    def parse(self, data: Dict[str, Any], model: BaseModel) -> Any: ...

Parser

Feature Flags & Idempotency

Feature flags with rule engine support from AWS AppConfig and idempotency patterns to prevent duplicate processing of events.

class FeatureFlags:
    def __init__(
        self,
        store: StoreProvider,
        logger: Logger = None,
    ): ...
    
    def evaluate(
        self,
        name: str,
        context: Dict[str, Any] = None,
        default: Any = False,
    ) -> bool | Any: ...

def idempotent(
    persistence_store: BasePersistenceLayer,
    config: IdempotencyConfig = None,
) -> Callable: ...

def idempotent_function(
    data_keyword_argument: str,
    persistence_store: BasePersistenceLayer,
    config: IdempotencyConfig = None,
) -> Callable: ...

Feature Flags & Idempotency

Utilities

Additional utilities including data masking, streaming, serialization, validation, JMESPath operations, and Kafka consumer helpers.

class DataMasking:
    def __init__(self, provider: BaseProvider): ...
    def erase(self, data: Any, fields: List[str] = None) -> Any: ...

class S3Object:
    def __init__(self, bucket: str, key: str, **kwargs): ...
    def transform(self, transform: BaseTransform) -> "S3Object": ...

def validate(
    event: Dict[str, Any],
    schema: Dict[str, Any],
    envelope: str = None,
) -> Dict[str, Any]: ...

Utilities

Core Types

from typing import Dict, Any, List, Optional, Union, Callable, Awaitable
from aws_lambda_powertools.utilities.typing import LambdaContext

class LambdaContext:
    """AWS Lambda context object"""
    function_name: str
    function_version: str
    invoked_function_arn: str
    memory_limit_in_mb: int
    remaining_time_in_millis: int
    request_id: str
    log_group_name: str
    log_stream_name: str
    identity: Any
    client_context: Any

# Metric and logging types
MetricUnit = Literal[
    "Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", 
    "Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits", 
    "Megabits", "Gigabits", "Terabits", "Percent", "Count", 
    "Bytes/Second", "Kilobytes/Second", "Megabytes/Second", 
    "Gigabytes/Second", "Terabytes/Second", "Bits/Second", 
    "Kilobits/Second", "Megabits/Second", "Gigabits/Second", 
    "Terabits/Second", "Count/Second", "None"
]

MetricResolution = Literal[1, 60]

# Event handler types
class CORSConfig:
    def __init__(
        self,
        allow_origin: str = "*",
        allow_headers: List[str] = None,
        allow_methods: List[str] = None,
        expose_headers: List[str] = None,
        max_age: int = None,
        allow_credentials: bool = False,
    ): ...

class Response:
    def __init__(
        self,
        status_code: int,
        content_type: str = None,
        body: str = None,
        headers: Dict[str, str] = None,
        cookies: List[str] = None,
    ): ...

# Batch processing types
EventType = Literal["SQS", "KinesisDataStreams", "DynamoDBStreams"]

class SuccessResponse:
    def __init__(self, **kwargs): ...

class FailureResponse:
    def __init__(self, **kwargs): ...

class ExceptionInfo:
    def __init__(self, exception: Exception, record: Dict[str, Any]): ...

# Parameter and configuration types
class GetParameterError(Exception): ...
class TransformParameterError(Exception): ...
class ConfigurationStoreError(Exception): ...

# Validation types  
class ValidationError(Exception): ...
class SchemaValidationError(Exception): ...
class InvalidSchemaFormatError(Exception): ...
class InvalidEnvelopeExpressionError(Exception): ...

# Parser types
class BaseModel:
    """Pydantic BaseModel re-export for parser functionality"""
    pass

def Field(**kwargs) -> Any:
    """Pydantic Field function re-export"""
    pass

# Idempotency types
class IdempotencyConfig:
    def __init__(
        self,
        event_key_jmespath: str = None,
        payload_validation_jmespath: str = None,
        raise_on_no_idempotency_key: bool = False,
        expires_after_seconds: int = 3600,
        use_local_cache: bool = False,
        local_cache_max_items: int = 256,
        hash_function: str = "md5",
        lambda_context: LambdaContext = None,
    ): ...

# Feature flags types
RuleAction = Literal["ALLOW", "DENY"]

class SchemaValidator:
    def __init__(self, schema: Dict[str, Any]): ...
    def validate(self, data: Dict[str, Any]) -> Dict[str, Any]: ...