Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python
89
Essential observability utilities for AWS Lambda functions including structured logging with Lambda context enrichment, CloudWatch embedded metric format (EMF) for custom metrics, and AWS X-Ray integration for distributed tracing.
Structured logging utility that automatically enriches log entries with Lambda context information and provides JSON formatting optimized for CloudWatch Logs.
class Logger:
def __init__(
self,
service: str = None,
level: str = "INFO",
child: bool = False,
sampling_rate: float = 0.0,
stream: TextIO = None,
logger_formatter: PowertoolsFormatter = None,
logger_handler: logging.Handler = None,
log_uncaught_exceptions: bool = False,
json_serializer: Callable[[Dict], str] = None,
json_deserializer: Callable[[Union[Dict, str, bool, int, float]], str] = None,
json_default: Callable[[Any], Any] = None,
datefmt: str = None,
use_datetime_directive: bool = False,
log_record_order: List[str] = None,
utc: bool = False,
use_rfc3339: bool = False,
):
"""
Initialize Logger with configuration options.
Parameters:
- service: Service name to identify origin of logs
- level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- child: Whether this is a child logger
- sampling_rate: Debug log sampling rate (0.0-1.0)
- stream: Output stream for logs
- logger_formatter: Custom log formatter
- logger_handler: Custom log handler
- log_uncaught_exceptions: Whether to log uncaught exceptions
- json_serializer: Custom JSON serializer function
- json_deserializer: Custom JSON deserializer function
- json_default: Default function for non-serializable objects
- datefmt: Date format string
- use_datetime_directive: Whether to use datetime directive
- log_record_order: Order of log record fields
- utc: Whether to use UTC timezone
- use_rfc3339: Whether to use RFC3339 datetime format
"""
def debug(self, msg: Any, *args, **kwargs) -> None:
"""Log debug message"""
def info(self, msg: Any, *args, **kwargs) -> None:
"""Log info message"""
def warning(self, msg: Any, *args, **kwargs) -> None:
"""Log warning message"""
def error(self, msg: Any, *args, **kwargs) -> None:
"""Log error message"""
def critical(self, msg: Any, *args, **kwargs) -> None:
"""Log critical message"""
def exception(self, msg: Any, *args, **kwargs) -> None:
"""Log exception with traceback"""
def setLevel(self, level: Union[str, int]) -> None:
"""Set logging level"""
def inject_lambda_context(
self,
correlation_id_path: str = None,
log_event: bool = False,
correlation_id: str = None,
clear_state: bool = False,
) -> Callable:
"""
Decorator to inject Lambda context into logs.
Parameters:
- correlation_id_path: JMESPath to extract correlation ID from event
- log_event: Whether to log the incoming event
- correlation_id: Static correlation ID to use
- clear_state: Whether to clear logger state after invocation
Returns:
Decorated function with Lambda context injection
"""
def append_keys(self, **kwargs) -> None:
"""Append additional keys to all subsequent log entries"""
def remove_keys(self, keys: List[str]) -> None:
"""Remove keys from subsequent log entries"""
def structure_logs(
self,
append: bool = False,
**kwargs,
) -> Callable:
"""
Decorator to add structured information to logs.
Parameters:
- append: Whether to append to existing structured data
- **kwargs: Key-value pairs to add to log structure
Returns:
Decorated function with structured logging
"""
def set_correlation_id(self, value: str) -> None:
"""Set correlation ID for request tracing"""
def get_correlation_id(self) -> str:
"""Get current correlation ID"""Configuration for logger buffering to reduce CloudWatch Logs API calls.
class LoggerBufferConfig:
def __init__(
self,
buffer_size: int = 100,
flush_on_exit: bool = True,
max_buffer_time: int = 5,
):
"""
Configure logger buffering behavior.
Parameters:
- buffer_size: Number of log entries to buffer before flushing
- flush_on_exit: Whether to flush buffer on Lambda exit
- max_buffer_time: Maximum time (seconds) to buffer logs
"""CloudWatch embedded metric format (EMF) utility for publishing custom metrics with high-cardinality dimensions.
class Metrics:
def __init__(
self,
service: str = None,
namespace: str = None,
metadata: Dict[str, Any] = None,
default_dimensions: Dict[str, str] = None,
):
"""
Initialize Metrics with configuration.
Parameters:
- service: Service name for default dimensions
- namespace: CloudWatch namespace for metrics
- metadata: Default metadata to include
- default_dimensions: Default dimensions for all metrics
"""
def add_metric(
self,
name: str,
unit: MetricUnit,
value: Union[float, int],
resolution: MetricResolution = 60,
) -> None:
"""
Add a metric to be published.
Parameters:
- name: Metric name
- unit: Unit of measurement
- value: Metric value
- resolution: Metric resolution in seconds (1 or 60)
"""
def add_dimension(self, name: str, value: str) -> None:
"""Add a dimension to current metric context"""
def add_metadata(self, key: str, value: Any) -> None:
"""Add metadata to current metric context"""
def set_default_dimensions(self, **dimensions) -> None:
"""Set default dimensions for all metrics"""
def clear_metrics(self) -> None:
"""Clear all metrics and dimensions"""
def clear_dimensions(self) -> None:
"""Clear all dimensions"""
def clear_metadata(self) -> None:
"""Clear all metadata"""
def log_metrics(
self,
lambda_handler: Callable = None,
capture_cold_start_metric: bool = False,
raise_on_empty_metrics: bool = False,
default_dimensions: Dict[str, str] = None,
) -> Callable:
"""
Decorator to automatically publish metrics after Lambda execution.
Parameters:
- lambda_handler: Lambda handler function to decorate
- capture_cold_start_metric: Whether to capture cold start metric
- raise_on_empty_metrics: Whether to raise error if no metrics added
- default_dimensions: Default dimensions to apply
Returns:
Decorated function with automatic metric publishing
"""
class EphemeralMetrics:
def __init__(
self,
service: str = None,
namespace: str = None,
):
"""
Ephemeral metrics that don't persist beyond function execution.
Parameters:
- service: Service name for default dimensions
- namespace: CloudWatch namespace for metrics
"""
def add_metric(
self,
name: str,
unit: MetricUnit,
value: Union[float, int],
) -> None:
"""Add ephemeral metric"""
def single_metric(
name: str,
unit: MetricUnit,
value: Union[float, int],
resolution: MetricResolution = 60,
default_dimensions: Dict[str, str] = None,
namespace: str = None,
) -> ContextManager:
"""
Context manager for publishing a single metric.
Parameters:
- name: Metric name
- unit: Unit of measurement
- value: Metric value
- resolution: Metric resolution in seconds
- default_dimensions: Dimensions to apply
- namespace: CloudWatch namespace
Returns:
Context manager that publishes metric on exit
"""AWS X-Ray tracing utility for distributed tracing across AWS services.
class Tracer:
def __init__(
self,
service: str = None,
disabled: bool = False,
auto_patch: bool = True,
patch_modules: List[str] = None,
provider: BaseProvider = None,
):
"""
Initialize Tracer with configuration.
Parameters:
- service: Service name for tracing
- disabled: Whether tracing is disabled
- auto_patch: Whether to auto-patch supported libraries
- patch_modules: Specific modules to patch for tracing
- provider: Custom tracing provider
"""
def capture_lambda_handler(
self,
lambda_handler: Callable = None,
capture_response: bool = True,
capture_error: bool = True,
) -> Callable:
"""
Decorator to capture Lambda handler execution in X-Ray trace.
Parameters:
- lambda_handler: Lambda handler function to trace
- capture_response: Whether to capture response in trace
- capture_error: Whether to capture errors in trace
Returns:
Decorated function with Lambda handler tracing
"""
def capture_method(
self,
method: Callable = None,
capture_response: bool = True,
capture_error: bool = True,
) -> Callable:
"""
Decorator to capture method execution in X-Ray subsegment.
Parameters:
- method: Method to trace
- capture_response: Whether to capture response
- capture_error: Whether to capture errors
Returns:
Decorated method with tracing
"""
def put_annotation(self, key: str, value: Union[str, int, float, bool]) -> None:
"""
Add annotation to current trace segment.
Parameters:
- key: Annotation key
- value: Annotation value (searchable in X-Ray console)
"""
def put_metadata(
self,
key: str,
value: Any,
namespace: str = "default",
) -> None:
"""
Add metadata to current trace segment.
Parameters:
- key: Metadata key
- value: Metadata value (not searchable, for debugging)
- namespace: Metadata namespace
"""
def patch(self, modules_to_patch: List[str]) -> None:
"""Manually patch modules for tracing"""
def provider(self) -> BaseProvider:
"""Get current tracing provider"""
def is_disabled(self) -> bool:
"""Check if tracing is disabled"""
def aiohttp_trace_config() -> Any:
"""
Get aiohttp trace configuration for async HTTP client tracing.
Returns:
aiohttp TraceConfig object for automatic request tracing
"""from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger(service="payment-service")
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
logger.info("Processing payment", extra={
"payment_id": event.get("payment_id"),
"amount": event.get("amount")
})
try:
# Process payment logic
result = process_payment(event)
logger.info("Payment processed successfully", extra={"result": result})
return {"statusCode": 200, "body": result}
except Exception as e:
logger.exception("Payment processing failed")
return {"statusCode": 500, "body": "Payment failed"}from aws_lambda_powertools import Metrics
from aws_lambda_powertools.metrics import MetricUnit
metrics = Metrics(service="payment-service", namespace="ECommerce")
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Add custom dimensions
metrics.add_dimension("payment_method", event.get("payment_method", "unknown"))
metrics.add_dimension("region", event.get("region", "us-east-1"))
# Record metrics
metrics.add_metric(name="PaymentAttempted", unit=MetricUnit.Count, value=1)
try:
amount = float(event.get("amount", 0))
metrics.add_metric(name="PaymentAmount", unit=MetricUnit.None, value=amount)
# Process payment
success = process_payment(event)
if success:
metrics.add_metric(name="PaymentSuccessful", unit=MetricUnit.Count, value=1)
else:
metrics.add_metric(name="PaymentFailed", unit=MetricUnit.Count, value=1)
return {"statusCode": 200 if success else 400}
except Exception as e:
metrics.add_metric(name="PaymentError", unit=MetricUnit.Count, value=1)
raisefrom aws_lambda_powertools import Tracer
import boto3
tracer = Tracer(service="payment-service")
dynamodb = boto3.resource("dynamodb")
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
tracer.put_annotation("payment_id", event.get("payment_id"))
tracer.put_metadata("event_details", event)
# Trace database operations
user_id = get_user_id(event["payment_id"])
# Trace external service calls
payment_result = process_external_payment(event)
return {"statusCode": 200, "result": payment_result}
@tracer.capture_method
def get_user_id(payment_id: str) -> str:
table = dynamodb.Table("payments")
# This DynamoDB call will be automatically traced
response = table.get_item(Key={"payment_id": payment_id})
tracer.put_annotation("user_found", "user_id" in response.get("Item", {}))
return response.get("Item", {}).get("user_id")
@tracer.capture_method(capture_response=False) # Don't capture sensitive response
def process_external_payment(event: dict) -> dict:
# External payment processing logic
tracer.put_annotation("payment_processor", "stripe")
# Simulate external API call
result = {"status": "success", "transaction_id": "txn_123"}
tracer.put_metadata("payment_processor_response", {
"status": result["status"],
"transaction_id": result["transaction_id"]
})
return resultfrom aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.typing import LambdaContext
# Initialize all observability tools
logger = Logger(service="order-service")
metrics = Metrics(service="order-service", namespace="ECommerce")
tracer = Tracer(service="order-service")
@logger.inject_lambda_context(correlation_id_path="headers.x-correlation-id")
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Extract order details
order_id = event.get("order_id")
customer_id = event.get("customer_id")
# Add structured logging context
logger.append_keys(order_id=order_id, customer_id=customer_id)
# Add tracing annotations for searchability
tracer.put_annotation("order_id", order_id)
tracer.put_annotation("customer_id", customer_id)
# Add metric dimensions
metrics.add_dimension("order_type", event.get("order_type", "standard"))
metrics.add_dimension("region", event.get("region", "us-east-1"))
logger.info("Processing order")
try:
# Process order logic
order = process_order(event)
# Record success metrics
metrics.add_metric(name="OrderProcessed", unit=MetricUnit.Count, value=1)
metrics.add_metric(name="OrderValue", unit=MetricUnit.None, value=order["total"])
logger.info("Order processed successfully", extra={"order_total": order["total"]})
return {
"statusCode": 200,
"body": {"order_id": order_id, "status": "processed"}
}
except ValidationError as e:
logger.warning("Order validation failed", extra={"error": str(e)})
metrics.add_metric(name="OrderValidationError", unit=MetricUnit.Count, value=1)
tracer.put_annotation("error_type", "validation")
return {"statusCode": 400, "body": {"error": "Invalid order"}}
except Exception as e:
logger.exception("Order processing failed")
metrics.add_metric(name="OrderProcessingError", unit=MetricUnit.Count, value=1)
tracer.put_annotation("error_type", "processing")
return {"statusCode": 500, "body": {"error": "Processing failed"}}
@tracer.capture_method
def process_order(event: dict) -> dict:
"""Process order with detailed tracing"""
tracer.put_metadata("order_details", event)
# Validate order
validate_order(event)
# Calculate totals
total = calculate_order_total(event)
# Save to database
save_order(event, total)
return {"order_id": event["order_id"], "total": total}from typing import Union, Dict, Any, List, Callable, TextIO, ContextManager
from logging import Handler
# Metric types
MetricUnit = Literal[
"Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes",
"Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits",
"Megabits", "Gigabits", "Terabits", "Percent", "Count",
"Bytes/Second", "Kilobytes/Second", "Megabytes/Second",
"Gigabytes/Second", "Terabytes/Second", "Bits/Second",
"Kilobits/Second", "Megabits/Second", "Gigabits/Second",
"Terabits/Second", "Count/Second", "None"
]
MetricResolution = Literal[1, 60]
# Exception types
class MetricUnitError(Exception):
"""Raised when an invalid metric unit is used"""
pass
class MetricResolutionError(Exception):
"""Raised when an invalid metric resolution is used"""
pass
class MetricValueError(Exception):
"""Raised when an invalid metric value is provided"""
pass
class SchemaValidationError(Exception):
"""Raised when metric schema validation fails"""
pass
# Formatter types
class PowertoolsFormatter:
"""Custom log formatter for Powertools"""
def __init__(
self,
json_serializer: Callable[[Dict], str] = None,
json_deserializer: Callable = None,
json_default: Callable[[Any], Any] = None,
datefmt: str = None,
use_datetime_directive: bool = False,
log_record_order: List[str] = None,
utc: bool = False,
use_rfc3339: bool = False,
): ...
# Provider types for tracing
class BaseProvider:
"""Base tracing provider interface"""
passInstall with Tessl CLI
npx tessl i tessl/pypi-aws-lambda-powertoolsdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10