Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python
npx @tessl/cli install tessl/pypi-aws-lambda-powertools@3.19.0AWS Lambda Powertools Python is a comprehensive developer toolkit that implements serverless best practices for AWS Lambda functions. The package provides utilities for structured logging with Lambda context enrichment, distributed tracing, custom metrics, event handling, batch processing, parameter management, and many other serverless-specific features.
pip install aws-lambda-powertoolsfrom aws_lambda_powertools import Logger, Metrics, TracerSpecific utilities:
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
from aws_lambda_powertools.utilities.parameters import get_parameter
from aws_lambda_powertools.utilities.batch import batch_processorfrom aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
# Initialize core observability utilities
logger = Logger()
metrics = Metrics()
tracer = Tracer()
@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@metrics.log_metrics
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Add custom metrics
metrics.add_metric(name="ProcessedEvents", unit="Count", value=1)
# Structure logging with context
logger.info("Processing event", extra={"event_type": "example"})
# Add tracing annotations
tracer.put_annotation(key="operation", value="process_event")
return {"statusCode": 200, "body": "Success"}AWS Lambda Powertools follows a modular architecture with independent utilities:
Essential observability utilities including structured logging with Lambda context enrichment, CloudWatch embedded metric format (EMF) for custom metrics, and AWS X-Ray integration for distributed tracing.
class Logger:
def __init__(
self,
service: str = None,
level: str = "INFO",
child: bool = False,
sampling_rate: float = 0.0,
stream: TextIO = None,
logger_formatter: PowertoolsFormatter = None,
logger_handler: logging.Handler = None,
log_uncaught_exceptions: bool = False,
json_serializer: Callable[[Dict], str] = None,
json_deserializer: Callable[[Union[Dict, str, bool, int, float]], str] = None,
json_default: Callable[[Any], Any] = None,
datefmt: str = None,
use_datetime_directive: bool = False,
log_record_order: List[str] = None,
utc: bool = False,
use_rfc3339: bool = False,
): ...
class Metrics:
def __init__(
self,
service: str = None,
namespace: str = None,
metadata: Dict[str, Any] = None,
default_dimensions: Dict[str, str] = None,
): ...
class Tracer:
def __init__(
self,
service: str = None,
disabled: bool = False,
auto_patch: bool = True,
patch_modules: List[str] = None,
provider: BaseProvider = None,
): ...Framework for building Lambda functions that handle HTTP events from API Gateway, Application Load Balancer, AppSync GraphQL, and other AWS services with automatic request/response serialization.
class APIGatewayRestResolver:
def __init__(
self,
cors: CORSConfig = None,
debug: bool = None,
serializer: Callable[[Dict], str] = None,
strip_prefixes: List[str] = None,
enable_validation: bool = False,
): ...
def get(self, rule: str, **kwargs): ...
def post(self, rule: str, **kwargs): ...
def put(self, rule: str, **kwargs): ...
def delete(self, rule: str, **kwargs): ...
class ALBResolver:
def __init__(
self,
cors: CORSConfig = None,
debug: bool = None,
serializer: Callable[[Dict], str] = None,
strip_prefixes: List[str] = None,
enable_validation: bool = False,
): ...
class AppSyncResolver:
def __init__(self, debug: bool = None): ...
def resolver(self, type_name: str = "*", field_name: str = None): ...Utilities for processing AWS SQS, DynamoDB Streams, and Kinesis records with built-in error handling, partial failure support, and automatic retries.
def batch_processor(
record_handler: Callable[[Dict], Any],
processor: BatchProcessor,
context: LambdaContext = None,
): ...
def async_batch_processor(
record_handler: Callable[[Dict], Awaitable[Any]],
processor: AsyncBatchProcessor,
context: LambdaContext = None,
): ...
class BatchProcessor:
def __init__(
self,
event_type: EventType,
model: BaseModel = None,
batch_length_quota_mb: int = 6,
): ...Type-safe data classes representing AWS event structures for Lambda triggers including API Gateway, S3, DynamoDB, Kinesis, SQS, SNS, and CloudWatch events.
class APIGatewayProxyEvent:
@property
def body(self) -> str | None: ...
@property
def json_body(self) -> Any: ...
@property
def headers(self) -> Dict[str, str]: ...
@property
def query_string_parameters(self) -> Dict[str, str] | None: ...
class SQSEvent:
@property
def records(self) -> List[SQSRecord]: ...
class DynamoDBStreamEvent:
@property
def records(self) -> List[DynamoDBRecord]: ...Retrieve and cache parameters from AWS Systems Manager Parameter Store, AWS Secrets Manager, and AWS AppConfig with automatic caching and transformation support.
def get_parameter(
name: str,
decrypt: bool = True,
max_age: int = 5,
transform: str = None,
force_fetch: bool = False,
**sdk_options,
) -> str: ...
def get_secret(
name: str,
version_id: str = None,
version_stage: str = None,
max_age: int = 5,
transform: str = None,
force_fetch: bool = False,
**sdk_options,
) -> str: ...
def get_app_config(
name: str,
environment: str,
application: str,
max_age: int = 5,
transform: str = None,
force_fetch: bool = False,
**sdk_options,
) -> bytes | str: ...Event parsing and validation using Pydantic models with built-in envelopes for extracting business logic from AWS event sources.
def event_parser(
model: BaseModel,
envelope: BaseEnvelope = None,
) -> Callable: ...
def parse(
event: Dict[str, Any],
model: BaseModel,
envelope: BaseEnvelope = None,
) -> Any: ...
class BaseEnvelope:
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any: ...Feature flags with rule engine support from AWS AppConfig and idempotency patterns to prevent duplicate processing of events.
class FeatureFlags:
def __init__(
self,
store: StoreProvider,
logger: Logger = None,
): ...
def evaluate(
self,
name: str,
context: Dict[str, Any] = None,
default: Any = False,
) -> bool | Any: ...
def idempotent(
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig = None,
) -> Callable: ...
def idempotent_function(
data_keyword_argument: str,
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig = None,
) -> Callable: ...Additional utilities including data masking, streaming, serialization, validation, JMESPath operations, and Kafka consumer helpers.
class DataMasking:
def __init__(self, provider: BaseProvider): ...
def erase(self, data: Any, fields: List[str] = None) -> Any: ...
class S3Object:
def __init__(self, bucket: str, key: str, **kwargs): ...
def transform(self, transform: BaseTransform) -> "S3Object": ...
def validate(
event: Dict[str, Any],
schema: Dict[str, Any],
envelope: str = None,
) -> Dict[str, Any]: ...from typing import Dict, Any, List, Optional, Union, Callable, Awaitable
from aws_lambda_powertools.utilities.typing import LambdaContext
class LambdaContext:
"""AWS Lambda context object"""
function_name: str
function_version: str
invoked_function_arn: str
memory_limit_in_mb: int
remaining_time_in_millis: int
request_id: str
log_group_name: str
log_stream_name: str
identity: Any
client_context: Any
# Metric and logging types
MetricUnit = Literal[
"Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes",
"Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits",
"Megabits", "Gigabits", "Terabits", "Percent", "Count",
"Bytes/Second", "Kilobytes/Second", "Megabytes/Second",
"Gigabytes/Second", "Terabytes/Second", "Bits/Second",
"Kilobits/Second", "Megabits/Second", "Gigabits/Second",
"Terabits/Second", "Count/Second", "None"
]
MetricResolution = Literal[1, 60]
# Event handler types
class CORSConfig:
def __init__(
self,
allow_origin: str = "*",
allow_headers: List[str] = None,
allow_methods: List[str] = None,
expose_headers: List[str] = None,
max_age: int = None,
allow_credentials: bool = False,
): ...
class Response:
def __init__(
self,
status_code: int,
content_type: str = None,
body: str = None,
headers: Dict[str, str] = None,
cookies: List[str] = None,
): ...
# Batch processing types
EventType = Literal["SQS", "KinesisDataStreams", "DynamoDBStreams"]
class SuccessResponse:
def __init__(self, **kwargs): ...
class FailureResponse:
def __init__(self, **kwargs): ...
class ExceptionInfo:
def __init__(self, exception: Exception, record: Dict[str, Any]): ...
# Parameter and configuration types
class GetParameterError(Exception): ...
class TransformParameterError(Exception): ...
class ConfigurationStoreError(Exception): ...
# Validation types
class ValidationError(Exception): ...
class SchemaValidationError(Exception): ...
class InvalidSchemaFormatError(Exception): ...
class InvalidEnvelopeExpressionError(Exception): ...
# Parser types
class BaseModel:
"""Pydantic BaseModel re-export for parser functionality"""
pass
def Field(**kwargs) -> Any:
"""Pydantic Field function re-export"""
pass
# Idempotency types
class IdempotencyConfig:
def __init__(
self,
event_key_jmespath: str = None,
payload_validation_jmespath: str = None,
raise_on_no_idempotency_key: bool = False,
expires_after_seconds: int = 3600,
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: LambdaContext = None,
): ...
# Feature flags types
RuleAction = Literal["ALLOW", "DENY"]
class SchemaValidator:
def __init__(self, schema: Dict[str, Any]): ...
def validate(self, data: Dict[str, Any]) -> Dict[str, Any]: ...