CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aws-lambda-powertools

Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python

89

1.21x
Overview
Eval results
Files

core-observability.mddocs/

Core Observability

Essential observability utilities for AWS Lambda functions including structured logging with Lambda context enrichment, CloudWatch embedded metric format (EMF) for custom metrics, and AWS X-Ray integration for distributed tracing.

Capabilities

Logger

Structured logging utility that automatically enriches log entries with Lambda context information and provides JSON formatting optimized for CloudWatch Logs.

class Logger:
    def __init__(
        self,
        service: str = None,
        level: str = "INFO", 
        child: bool = False,
        sampling_rate: float = 0.0,
        stream: TextIO = None,
        logger_formatter: PowertoolsFormatter = None,
        logger_handler: logging.Handler = None,
        log_uncaught_exceptions: bool = False,
        json_serializer: Callable[[Dict], str] = None,
        json_deserializer: Callable[[Union[Dict, str, bool, int, float]], str] = None,
        json_default: Callable[[Any], Any] = None,
        datefmt: str = None,
        use_datetime_directive: bool = False,
        log_record_order: List[str] = None,
        utc: bool = False,
        use_rfc3339: bool = False,
    ):
        """
        Initialize Logger with configuration options.
        
        Parameters:
        - service: Service name to identify origin of logs
        - level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        - child: Whether this is a child logger
        - sampling_rate: Debug log sampling rate (0.0-1.0)
        - stream: Output stream for logs
        - logger_formatter: Custom log formatter
        - logger_handler: Custom log handler
        - log_uncaught_exceptions: Whether to log uncaught exceptions
        - json_serializer: Custom JSON serializer function
        - json_deserializer: Custom JSON deserializer function  
        - json_default: Default function for non-serializable objects
        - datefmt: Date format string
        - use_datetime_directive: Whether to use datetime directive
        - log_record_order: Order of log record fields
        - utc: Whether to use UTC timezone
        - use_rfc3339: Whether to use RFC3339 datetime format
        """

    def debug(self, msg: Any, *args, **kwargs) -> None:
        """Log debug message"""

    def info(self, msg: Any, *args, **kwargs) -> None: 
        """Log info message"""

    def warning(self, msg: Any, *args, **kwargs) -> None:
        """Log warning message"""

    def error(self, msg: Any, *args, **kwargs) -> None:
        """Log error message"""
        
    def critical(self, msg: Any, *args, **kwargs) -> None:
        """Log critical message"""

    def exception(self, msg: Any, *args, **kwargs) -> None:
        """Log exception with traceback"""

    def setLevel(self, level: Union[str, int]) -> None:
        """Set logging level"""

    def inject_lambda_context(
        self,
        correlation_id_path: str = None,
        log_event: bool = False,
        correlation_id: str = None,
        clear_state: bool = False,
    ) -> Callable:
        """
        Decorator to inject Lambda context into logs.
        
        Parameters:
        - correlation_id_path: JMESPath to extract correlation ID from event
        - log_event: Whether to log the incoming event
        - correlation_id: Static correlation ID to use
        - clear_state: Whether to clear logger state after invocation
        
        Returns:
        Decorated function with Lambda context injection
        """

    def append_keys(self, **kwargs) -> None:
        """Append additional keys to all subsequent log entries"""

    def remove_keys(self, keys: List[str]) -> None:
        """Remove keys from subsequent log entries"""

    def structure_logs(
        self,
        append: bool = False,
        **kwargs,
    ) -> Callable:
        """
        Decorator to add structured information to logs.
        
        Parameters:
        - append: Whether to append to existing structured data
        - **kwargs: Key-value pairs to add to log structure
        
        Returns:
        Decorated function with structured logging
        """

    def set_correlation_id(self, value: str) -> None:
        """Set correlation ID for request tracing"""

    def get_correlation_id(self) -> str:
        """Get current correlation ID"""

Logger Buffer Configuration

Configuration for logger buffering to reduce CloudWatch Logs API calls.

class LoggerBufferConfig:
    def __init__(
        self,
        buffer_size: int = 100,
        flush_on_exit: bool = True,
        max_buffer_time: int = 5,
    ):
        """
        Configure logger buffering behavior.
        
        Parameters:
        - buffer_size: Number of log entries to buffer before flushing
        - flush_on_exit: Whether to flush buffer on Lambda exit
        - max_buffer_time: Maximum time (seconds) to buffer logs
        """

Metrics

CloudWatch embedded metric format (EMF) utility for publishing custom metrics with high-cardinality dimensions.

class Metrics:
    def __init__(
        self,
        service: str = None,
        namespace: str = None,
        metadata: Dict[str, Any] = None,
        default_dimensions: Dict[str, str] = None,
    ):
        """
        Initialize Metrics with configuration.
        
        Parameters:
        - service: Service name for default dimensions
        - namespace: CloudWatch namespace for metrics
        - metadata: Default metadata to include
        - default_dimensions: Default dimensions for all metrics
        """

    def add_metric(
        self,
        name: str,
        unit: MetricUnit,
        value: Union[float, int],
        resolution: MetricResolution = 60,
    ) -> None:
        """
        Add a metric to be published.
        
        Parameters:
        - name: Metric name
        - unit: Unit of measurement
        - value: Metric value
        - resolution: Metric resolution in seconds (1 or 60)
        """

    def add_dimension(self, name: str, value: str) -> None:
        """Add a dimension to current metric context"""

    def add_metadata(self, key: str, value: Any) -> None:
        """Add metadata to current metric context"""

    def set_default_dimensions(self, **dimensions) -> None:
        """Set default dimensions for all metrics"""

    def clear_metrics(self) -> None:
        """Clear all metrics and dimensions"""

    def clear_dimensions(self) -> None:
        """Clear all dimensions"""
        
    def clear_metadata(self) -> None:
        """Clear all metadata"""

    def log_metrics(
        self,
        lambda_handler: Callable = None,
        capture_cold_start_metric: bool = False,
        raise_on_empty_metrics: bool = False,
        default_dimensions: Dict[str, str] = None,
    ) -> Callable:
        """
        Decorator to automatically publish metrics after Lambda execution.
        
        Parameters:
        - lambda_handler: Lambda handler function to decorate
        - capture_cold_start_metric: Whether to capture cold start metric
        - raise_on_empty_metrics: Whether to raise error if no metrics added
        - default_dimensions: Default dimensions to apply
        
        Returns:
        Decorated function with automatic metric publishing
        """

class EphemeralMetrics:
    def __init__(
        self,
        service: str = None,
        namespace: str = None,
    ):
        """
        Ephemeral metrics that don't persist beyond function execution.
        
        Parameters:
        - service: Service name for default dimensions  
        - namespace: CloudWatch namespace for metrics
        """

    def add_metric(
        self,
        name: str,
        unit: MetricUnit,
        value: Union[float, int],
    ) -> None:
        """Add ephemeral metric"""

def single_metric(
    name: str,
    unit: MetricUnit,
    value: Union[float, int],
    resolution: MetricResolution = 60,
    default_dimensions: Dict[str, str] = None,
    namespace: str = None,
) -> ContextManager:
    """
    Context manager for publishing a single metric.
    
    Parameters:
    - name: Metric name
    - unit: Unit of measurement
    - value: Metric value  
    - resolution: Metric resolution in seconds
    - default_dimensions: Dimensions to apply
    - namespace: CloudWatch namespace
    
    Returns:
    Context manager that publishes metric on exit
    """

Tracer

AWS X-Ray tracing utility for distributed tracing across AWS services.

class Tracer:
    def __init__(
        self,
        service: str = None,
        disabled: bool = False,
        auto_patch: bool = True,
        patch_modules: List[str] = None,
        provider: BaseProvider = None,
    ):
        """
        Initialize Tracer with configuration.
        
        Parameters:
        - service: Service name for tracing
        - disabled: Whether tracing is disabled
        - auto_patch: Whether to auto-patch supported libraries
        - patch_modules: Specific modules to patch for tracing
        - provider: Custom tracing provider
        """

    def capture_lambda_handler(
        self,
        lambda_handler: Callable = None,
        capture_response: bool = True,
        capture_error: bool = True,
    ) -> Callable:
        """
        Decorator to capture Lambda handler execution in X-Ray trace.
        
        Parameters:
        - lambda_handler: Lambda handler function to trace
        - capture_response: Whether to capture response in trace
        - capture_error: Whether to capture errors in trace
        
        Returns:
        Decorated function with Lambda handler tracing
        """

    def capture_method(
        self,
        method: Callable = None,
        capture_response: bool = True,
        capture_error: bool = True,
    ) -> Callable:
        """
        Decorator to capture method execution in X-Ray subsegment.
        
        Parameters:
        - method: Method to trace
        - capture_response: Whether to capture response
        - capture_error: Whether to capture errors
        
        Returns:
        Decorated method with tracing
        """

    def put_annotation(self, key: str, value: Union[str, int, float, bool]) -> None:
        """
        Add annotation to current trace segment.
        
        Parameters:
        - key: Annotation key
        - value: Annotation value (searchable in X-Ray console)
        """

    def put_metadata(
        self,
        key: str,
        value: Any,
        namespace: str = "default",
    ) -> None:
        """
        Add metadata to current trace segment.
        
        Parameters:
        - key: Metadata key
        - value: Metadata value (not searchable, for debugging)
        - namespace: Metadata namespace
        """

    def patch(self, modules_to_patch: List[str]) -> None:
        """Manually patch modules for tracing"""

    def provider(self) -> BaseProvider:
        """Get current tracing provider"""

    def is_disabled(self) -> bool:
        """Check if tracing is disabled"""

def aiohttp_trace_config() -> Any:
    """
    Get aiohttp trace configuration for async HTTP client tracing.
    
    Returns:
    aiohttp TraceConfig object for automatic request tracing
    """

Usage Examples

Basic Logging with Lambda Context

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger(service="payment-service")

@logger.inject_lambda_context(log_event=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    logger.info("Processing payment", extra={
        "payment_id": event.get("payment_id"),
        "amount": event.get("amount")
    })
    
    try:
        # Process payment logic
        result = process_payment(event)
        logger.info("Payment processed successfully", extra={"result": result})
        return {"statusCode": 200, "body": result}
    except Exception as e:
        logger.exception("Payment processing failed")
        return {"statusCode": 500, "body": "Payment failed"}

Metrics with Custom Dimensions

from aws_lambda_powertools import Metrics
from aws_lambda_powertools.metrics import MetricUnit

metrics = Metrics(service="payment-service", namespace="ECommerce")

@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Add custom dimensions
    metrics.add_dimension("payment_method", event.get("payment_method", "unknown"))
    metrics.add_dimension("region", event.get("region", "us-east-1"))
    
    # Record metrics
    metrics.add_metric(name="PaymentAttempted", unit=MetricUnit.Count, value=1)
    
    try:
        amount = float(event.get("amount", 0))
        metrics.add_metric(name="PaymentAmount", unit=MetricUnit.None, value=amount)
        
        # Process payment
        success = process_payment(event)
        
        if success:
            metrics.add_metric(name="PaymentSuccessful", unit=MetricUnit.Count, value=1)
        else:
            metrics.add_metric(name="PaymentFailed", unit=MetricUnit.Count, value=1)
            
        return {"statusCode": 200 if success else 400}
        
    except Exception as e:
        metrics.add_metric(name="PaymentError", unit=MetricUnit.Count, value=1)
        raise

Distributed Tracing with X-Ray

from aws_lambda_powertools import Tracer
import boto3

tracer = Tracer(service="payment-service")
dynamodb = boto3.resource("dynamodb")

@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    tracer.put_annotation("payment_id", event.get("payment_id"))
    tracer.put_metadata("event_details", event)
    
    # Trace database operations
    user_id = get_user_id(event["payment_id"])
    
    # Trace external service calls
    payment_result = process_external_payment(event)
    
    return {"statusCode": 200, "result": payment_result}

@tracer.capture_method
def get_user_id(payment_id: str) -> str:
    table = dynamodb.Table("payments")
    
    # This DynamoDB call will be automatically traced
    response = table.get_item(Key={"payment_id": payment_id})
    
    tracer.put_annotation("user_found", "user_id" in response.get("Item", {}))
    return response.get("Item", {}).get("user_id")

@tracer.capture_method(capture_response=False)  # Don't capture sensitive response
def process_external_payment(event: dict) -> dict:
    # External payment processing logic
    tracer.put_annotation("payment_processor", "stripe")
    
    # Simulate external API call
    result = {"status": "success", "transaction_id": "txn_123"}
    
    tracer.put_metadata("payment_processor_response", {
        "status": result["status"],
        "transaction_id": result["transaction_id"]
    })
    
    return result

Combined Observability Pattern

from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext

# Initialize all observability tools
logger = Logger(service="order-service")
metrics = Metrics(service="order-service", namespace="ECommerce") 
tracer = Tracer(service="order-service")

@logger.inject_lambda_context(correlation_id_path="headers.x-correlation-id")
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Extract order details
    order_id = event.get("order_id")
    customer_id = event.get("customer_id")
    
    # Add structured logging context
    logger.append_keys(order_id=order_id, customer_id=customer_id)
    
    # Add tracing annotations for searchability
    tracer.put_annotation("order_id", order_id)
    tracer.put_annotation("customer_id", customer_id)
    
    # Add metric dimensions
    metrics.add_dimension("order_type", event.get("order_type", "standard"))
    metrics.add_dimension("region", event.get("region", "us-east-1"))
    
    logger.info("Processing order")
    
    try:
        # Process order logic
        order = process_order(event)
        
        # Record success metrics
        metrics.add_metric(name="OrderProcessed", unit=MetricUnit.Count, value=1)
        metrics.add_metric(name="OrderValue", unit=MetricUnit.None, value=order["total"])
        
        logger.info("Order processed successfully", extra={"order_total": order["total"]})
        
        return {
            "statusCode": 200,
            "body": {"order_id": order_id, "status": "processed"}
        }
        
    except ValidationError as e:
        logger.warning("Order validation failed", extra={"error": str(e)})
        metrics.add_metric(name="OrderValidationError", unit=MetricUnit.Count, value=1)
        tracer.put_annotation("error_type", "validation")
        
        return {"statusCode": 400, "body": {"error": "Invalid order"}}
        
    except Exception as e:
        logger.exception("Order processing failed")
        metrics.add_metric(name="OrderProcessingError", unit=MetricUnit.Count, value=1)
        tracer.put_annotation("error_type", "processing")
        
        return {"statusCode": 500, "body": {"error": "Processing failed"}}

@tracer.capture_method
def process_order(event: dict) -> dict:
    """Process order with detailed tracing"""
    tracer.put_metadata("order_details", event)
    
    # Validate order
    validate_order(event)
    
    # Calculate totals
    total = calculate_order_total(event)
    
    # Save to database
    save_order(event, total)
    
    return {"order_id": event["order_id"], "total": total}

Types

from typing import Union, Dict, Any, List, Callable, TextIO, ContextManager
from logging import Handler

# Metric types
MetricUnit = Literal[
    "Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", 
    "Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits", 
    "Megabits", "Gigabits", "Terabits", "Percent", "Count", 
    "Bytes/Second", "Kilobytes/Second", "Megabytes/Second", 
    "Gigabytes/Second", "Terabytes/Second", "Bits/Second", 
    "Kilobits/Second", "Megabits/Second", "Gigabits/Second", 
    "Terabits/Second", "Count/Second", "None"
]

MetricResolution = Literal[1, 60]

# Exception types
class MetricUnitError(Exception):
    """Raised when an invalid metric unit is used"""
    pass

class MetricResolutionError(Exception): 
    """Raised when an invalid metric resolution is used"""
    pass

class MetricValueError(Exception):
    """Raised when an invalid metric value is provided"""
    pass

class SchemaValidationError(Exception):
    """Raised when metric schema validation fails"""
    pass

# Formatter types  
class PowertoolsFormatter:
    """Custom log formatter for Powertools"""
    def __init__(
        self,
        json_serializer: Callable[[Dict], str] = None,
        json_deserializer: Callable = None,
        json_default: Callable[[Any], Any] = None,
        datefmt: str = None,
        use_datetime_directive: bool = False,
        log_record_order: List[str] = None,
        utc: bool = False,
        use_rfc3339: bool = False,
    ): ...

# Provider types for tracing
class BaseProvider:
    """Base tracing provider interface"""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-aws-lambda-powertools

docs

batch-processing.md

core-observability.md

data-classes.md

event-handlers.md

feature-flags.md

index.md

parameters.md

parser.md

utilities.md

tile.json