Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python
89
Feature flags with rule engine support from AWS AppConfig and idempotency patterns to prevent duplicate processing of events. Enables dynamic feature control and reliable event processing in serverless applications.
Dynamic feature flag management with rule-based evaluation from AWS AppConfig.
class FeatureFlags:
def __init__(
self,
store: StoreProvider,
logger: Logger = None,
):
"""
Initialize feature flags with store provider.
Parameters:
- store: Store provider (usually AppConfigStore)
- logger: Optional logger for feature flag events
"""
def evaluate(
self,
name: str,
context: Dict[str, Any] = None,
default: Any = False,
) -> bool | Any:
"""
Evaluate feature flag with optional context.
Parameters:
- name: Feature flag name
- context: Evaluation context (user ID, region, etc.)
- default: Default value if flag not found
Returns:
Feature flag value (boolean or complex value)
"""
def get_enabled_features(self, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Get all enabled features for given context.
Parameters:
- context: Evaluation context
Returns:
Dictionary of enabled feature names to values
"""
def batch_evaluate(
self,
flags: List[str],
context: Dict[str, Any] = None,
default: Any = False,
) -> Dict[str, Any]:
"""
Evaluate multiple feature flags in batch.
Parameters:
- flags: List of feature flag names
- context: Evaluation context
- default: Default value for missing flags
Returns:
Dictionary mapping flag names to values
"""
class AppConfigStore(StoreProvider):
def __init__(
self,
environment: str,
application: str,
name: str,
max_age: int = 5,
sdk_config: Dict[str, Any] = None,
envelope: str = None,
jmespath_options: Dict[str, Any] = None,
logger: Logger = None,
):
"""
AWS AppConfig store provider for feature flags.
Parameters:
- environment: AppConfig environment name
- application: AppConfig application name
- name: AppConfig configuration profile name
- max_age: Cache TTL in seconds
- sdk_config: Boto3 client configuration
- envelope: JMESPath envelope for data extraction
- jmespath_options: JMESPath evaluation options
- logger: Optional logger instance
"""
def get_configuration(self) -> Dict[str, Any]:
"""
Get configuration from AppConfig.
Returns:
Complete configuration dictionary
"""
def _get_flag_value(
self,
name: str,
context: Dict[str, Any] = None,
default: Any = None,
) -> Any:
"""Get individual flag value with rule evaluation"""
class StoreProvider:
"""Base store provider interface for feature flags"""
def get_configuration(self) -> Dict[str, Any]:
"""Get complete configuration"""
raise NotImplementedError
def get(
self,
name: str,
context: Dict[str, Any] = None,
default: Any = None,
) -> Any:
"""Get configuration value by name"""
raise NotImplementedErrorRule-based feature flag evaluation with advanced targeting capabilities.
class RuleAction:
ALLOW = "ALLOW"
DENY = "DENY"
class SchemaValidator:
def __init__(self, schema: Dict[str, Any]):
"""
Initialize schema validator for feature flag configuration.
Parameters:
- schema: JSON schema for configuration validation
"""
def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate configuration data against schema.
Parameters:
- data: Configuration data to validate
Returns:
Validated configuration data
Raises:
SchemaValidationError: If validation fails
"""Idempotency patterns to ensure Lambda functions process events exactly once.
def idempotent(
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig = None,
) -> Callable:
"""
Decorator for making Lambda handler idempotent.
Parameters:
- persistence_store: Storage layer for idempotency keys
- config: Idempotency configuration options
Returns:
Decorated function that prevents duplicate processing
"""
def idempotent_function(
data_keyword_argument: str,
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig = None,
) -> Callable:
"""
Decorator for making specific function calls idempotent.
Parameters:
- data_keyword_argument: Keyword argument name containing data for key generation
- persistence_store: Storage layer for idempotency keys
- config: Idempotency configuration options
Returns:
Decorated function with idempotency guarantees
"""
class IdempotencyConfig:
def __init__(
self,
event_key_jmespath: str = None,
payload_validation_jmespath: str = None,
raise_on_no_idempotency_key: bool = False,
expires_after_seconds: int = 3600,
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: LambdaContext = None,
):
"""
Configuration for idempotency behavior.
Parameters:
- event_key_jmespath: JMESPath to extract idempotency key from event
- payload_validation_jmespath: JMESPath to extract payload for validation
- raise_on_no_idempotency_key: Whether to raise error if no key found
- expires_after_seconds: TTL for idempotency records
- use_local_cache: Whether to use in-memory cache
- local_cache_max_items: Maximum items in local cache
- hash_function: Hash function for key generation (md5, sha1, sha256)
- lambda_context: Lambda context for additional key generation
"""
class BasePersistenceLayer:
"""Base persistence layer for idempotency records"""
def __init__(
self,
table_name: str,
key_attr: str = "id",
expiry_attr: str = "expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
):
"""
Initialize persistence layer.
Parameters:
- table_name: Storage table/collection name
- key_attr: Attribute name for idempotency key
- expiry_attr: Attribute name for expiration timestamp
- status_attr: Attribute name for processing status
- data_attr: Attribute name for stored result
- validation_key_attr: Attribute name for payload validation
"""
def save_inprogress(
self,
idempotency_key: str,
remaining_time_in_millis: int = None,
) -> None:
"""
Save in-progress idempotency record.
Parameters:
- idempotency_key: Unique idempotency key
- remaining_time_in_millis: Remaining Lambda execution time
"""
def save_success(
self,
idempotency_key: str,
result: Any,
remaining_time_in_millis: int = None,
) -> None:
"""
Save successful processing result.
Parameters:
- idempotency_key: Unique idempotency key
- result: Function result to store
- remaining_time_in_millis: Remaining Lambda execution time
"""
def get_record(self, idempotency_key: str) -> Dict[str, Any] | None:
"""
Retrieve idempotency record by key.
Parameters:
- idempotency_key: Idempotency key to lookup
Returns:
Stored record or None if not found
"""
def delete_record(self, idempotency_key: str) -> None:
"""
Delete idempotency record.
Parameters:
- idempotency_key: Key of record to delete
"""
class DynamoDBPersistenceLayer(BasePersistenceLayer):
def __init__(
self,
table_name: str,
key_attr: str = "id",
expiry_attr: str = "expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
boto_config: Dict[str, Any] = None,
boto3_session: boto3.Session = None,
):
"""
DynamoDB persistence layer for idempotency.
Parameters:
- table_name: DynamoDB table name
- key_attr: Primary key attribute name
- expiry_attr: TTL attribute name
- status_attr: Status attribute name
- data_attr: Data attribute name
- validation_key_attr: Validation key attribute name
- boto_config: Boto3 client configuration
- boto3_session: Boto3 session for authentication
"""
class IdempotentHookFunction:
"""Hook function interface for idempotency events"""
def __call__(
self,
idempotency_key: str,
result: Any = None,
exception: Exception = None,
) -> None:
"""
Hook function called during idempotency processing.
Parameters:
- idempotency_key: The idempotency key
- result: Function result (if successful)
- exception: Exception (if failed)
"""from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore
from aws_lambda_powertools.utilities.typing import LambdaContext
import os
# Initialize feature flags with AppConfig
store = AppConfigStore(
environment=os.environ["ENVIRONMENT"],
application=os.environ["APP_NAME"],
name="feature-flags"
)
feature_flags = FeatureFlags(store=store)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Simple boolean flag
if feature_flags.evaluate(name="enable_new_algorithm", default=False):
result = new_algorithm_processing(event)
else:
result = legacy_algorithm_processing(event)
# Feature flag with context
user_id = event.get("user_id")
user_context = {"userId": user_id, "region": event.get("region", "us-east-1")}
if feature_flags.evaluate(name="enable_premium_features", context=user_context, default=False):
result["premium_data"] = get_premium_data(user_id)
# Complex feature flag with configuration
recommendation_config = feature_flags.evaluate(
name="recommendation_settings",
context=user_context,
default={"enabled": False, "algorithm": "basic", "limit": 5}
)
if recommendation_config.get("enabled", False):
recommendations = get_recommendations(
user_id=user_id,
algorithm=recommendation_config.get("algorithm", "basic"),
limit=recommendation_config.get("limit", 5)
)
result["recommendations"] = recommendations
return {
"statusCode": 200,
"body": result
}
def new_algorithm_processing(event: dict) -> dict:
"""New algorithm implementation"""
return {"algorithm": "v2", "result": "processed_with_new_logic"}
def legacy_algorithm_processing(event: dict) -> dict:
"""Legacy algorithm implementation"""
return {"algorithm": "v1", "result": "processed_with_legacy_logic"}from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger
import os
logger = Logger()
# AppConfig feature flag configuration example:
# {
# "feature_flags": {
# "enable_beta_features": {
# "enabled": true,
# "rules": {
# "allow_beta_users": {
# "when_match": [
# {
# "action": "ALLOW",
# "conditions": [
# {
# "key": "userType",
# "value": "beta",
# "condition": "EQUALS"
# }
# ]
# }
# ],
# "when_no_match": {
# "action": "DENY"
# }
# }
# },
# "default": false
# },
# "api_rate_limit": {
# "enabled": true,
# "rules": {
# "premium_rate_limit": {
# "when_match": [
# {
# "action": "ALLOW",
# "conditions": [
# {
# "key": "userTier",
# "value": "premium",
# "condition": "EQUALS"
# }
# ]
# }
# ],
# "when_no_match": {
# "action": "DENY"
# }
# }
# },
# "default": {"requests_per_minute": 100}
# }
# }
# }
store = AppConfigStore(
environment=os.environ["ENVIRONMENT"],
application="my-app",
name="feature-config"
)
feature_flags = FeatureFlags(store=store, logger=logger)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Extract user context from event
user_context = {
"userId": event.get("user_id"),
"userType": event.get("user_type", "standard"),
"userTier": event.get("user_tier", "basic"),
"region": event.get("region", "us-east-1"),
"deviceType": event.get("device_type", "web")
}
logger.append_keys(user_context=user_context)
# Evaluate feature flags with context
results = {}
# Beta features access
beta_enabled = feature_flags.evaluate(
name="enable_beta_features",
context=user_context,
default=False
)
if beta_enabled:
logger.info("Beta features enabled for user")
results["beta_features"] = get_beta_features(user_context)
else:
logger.info("Beta features not available for user")
# Dynamic rate limiting
rate_limit_config = feature_flags.evaluate(
name="api_rate_limit",
context=user_context,
default={"requests_per_minute": 60}
)
# Apply rate limiting
rate_limit = rate_limit_config.get("requests_per_minute", 60)
if not check_rate_limit(user_context["userId"], rate_limit):
return {
"statusCode": 429,
"body": {"error": "Rate limit exceeded"}
}
# Batch evaluate multiple flags
flag_results = feature_flags.batch_evaluate(
flags=["enable_analytics", "enable_notifications", "enable_cache"],
context=user_context,
default=False
)
# Process based on enabled features
if flag_results.get("enable_analytics", False):
track_user_event(event, user_context)
if flag_results.get("enable_notifications", False):
queue_notification(user_context["userId"], "feature_accessed")
# Use caching if enabled
cache_enabled = flag_results.get("enable_cache", False)
if cache_enabled:
results["cached_data"] = get_cached_data(user_context["userId"])
else:
results["fresh_data"] = get_fresh_data(user_context["userId"])
return {
"statusCode": 200,
"body": results,
"rate_limit": rate_limit
}
def get_beta_features(user_context: dict) -> dict:
"""Get beta features for user"""
return {
"new_dashboard": True,
"advanced_analytics": True,
"ai_recommendations": True
}
def check_rate_limit(user_id: str, limit: int) -> bool:
"""Check if user is within rate limit"""
# Implementation would check against cache/database
return True # Simplified for example
def track_user_event(event: dict, user_context: dict):
"""Track analytics event"""
logger.info("Tracking user event", extra={"event_type": "api_access"})
def queue_notification(user_id: str, notification_type: str):
"""Queue notification for user"""
logger.info("Queuing notification", extra={
"user_id": user_id,
"type": notification_type
})from aws_lambda_powertools.utilities.idempotency import (
idempotent,
DynamoDBPersistenceLayer,
IdempotencyConfig
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger
import json
logger = Logger()
# Configure DynamoDB persistence layer
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
# Configure idempotency behavior
config = IdempotencyConfig(
event_key_jmespath="requestId", # Extract requestId from event
expires_after_seconds=3600, # 1 hour TTL
use_local_cache=True, # Enable local caching
)
@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
"""
Idempotent Lambda handler - duplicate requests return cached result
"""
request_id = event.get("requestId")
logger.info(f"Processing request {request_id}")
# This logic will only execute once per unique requestId
user_id = event.get("userId")
order_data = event.get("orderData", {})
# Simulate expensive operation
order_id = process_order(user_id, order_data)
# Send confirmation (will only happen once)
send_order_confirmation(user_id, order_id)
result = {
"orderId": order_id,
"status": "processed",
"timestamp": context.aws_request_id
}
logger.info(f"Order processed: {order_id}")
return {
"statusCode": 200,
"body": json.dumps(result)
}
def process_order(user_id: str, order_data: dict) -> str:
"""Process order (expensive operation)"""
import uuid
import time
# Simulate processing time
time.sleep(1)
order_id = str(uuid.uuid4())
logger.info(f"Created order {order_id} for user {user_id}")
return order_id
def send_order_confirmation(user_id: str, order_id: str):
"""Send order confirmation (side effect)"""
logger.info(f"Sending confirmation for order {order_id} to user {user_id}")
# Email/SMS sending logic herefrom aws_lambda_powertools.utilities.idempotency import (
idempotent,
idempotent_function,
DynamoDBPersistenceLayer,
IdempotencyConfig
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger
import hashlib
import json
logger = Logger()
# Persistence layer with custom configuration
persistence_layer = DynamoDBPersistenceLayer(
table_name="IdempotencyTable",
key_attr="idempotency_key",
expiry_attr="expires_at",
status_attr="status",
data_attr="result_data"
)
# Complex idempotency configuration
config = IdempotencyConfig(
event_key_jmespath="headers.`Idempotency-Key` || body.transactionId",
payload_validation_jmespath="body", # Validate entire body for changes
expires_after_seconds=86400, # 24 hours
use_local_cache=True,
hash_function="sha256",
raise_on_no_idempotency_key=True
)
@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
"""Handler with custom idempotency key extraction"""
# Process payment request
payment_data = event.get("body", {})
headers = event.get("headers", {})
idempotency_key = headers.get("Idempotency-Key") or payment_data.get("transactionId")
logger.append_keys(idempotency_key=idempotency_key)
logger.info("Processing payment request")
# This will only execute once per unique key
result = process_payment_request(payment_data)
return {
"statusCode": 200,
"body": json.dumps(result)
}
# Separate persistence layer for user operations
user_persistence = DynamoDBPersistenceLayer(table_name="UserIdempotencyTable")
user_config = IdempotencyConfig(
expires_after_seconds=3600,
use_local_cache=True
)
@idempotent_function(
data_keyword_argument="user_data",
persistence_store=user_persistence,
config=user_config
)
def create_user(user_data: dict) -> dict:
"""Idempotent user creation function"""
logger.info("Creating new user", extra={"email": user_data.get("email")})
# Check if user already exists
if user_exists(user_data["email"]):
raise ValueError("User already exists")
# Create user (expensive operation)
user_id = generate_user_id()
save_user_to_database(user_id, user_data)
send_welcome_email(user_data["email"])
return {
"user_id": user_id,
"email": user_data["email"],
"created_at": "2024-01-01T00:00:00Z"
}
def process_payment_request(payment_data: dict) -> dict:
"""Process payment with external API"""
amount = payment_data["amount"]
currency = payment_data["currency"]
payment_method = payment_data["payment_method"]
# Call external payment processor
transaction_id = call_payment_processor(amount, currency, payment_method)
# Update internal records
update_payment_records(transaction_id, payment_data)
return {
"transaction_id": transaction_id,
"amount": amount,
"currency": currency,
"status": "completed"
}
# Bulk operations with per-item idempotency
@idempotent_function(
data_keyword_argument="item_data",
persistence_store=persistence_layer,
config=IdempotencyConfig(expires_after_seconds=1800)
)
def process_bulk_item(item_data: dict) -> dict:
"""Process individual item in bulk operation"""
item_id = item_data["item_id"]
logger.info(f"Processing item {item_id}")
# Expensive per-item processing
result = expensive_item_processing(item_data)
return {
"item_id": item_id,
"processed": True,
"result": result
}
def bulk_handler(event: dict, context: LambdaContext) -> dict:
"""Handle bulk operations with per-item idempotency"""
items = event.get("items", [])
results = []
for item in items:
try:
# Each item is processed idempotently
result = process_bulk_item(item_data=item)
results.append(result)
except Exception as e:
logger.exception(f"Failed to process item {item.get('item_id')}")
results.append({
"item_id": item.get("item_id"),
"error": str(e)
})
return {
"statusCode": 200,
"body": json.dumps({
"processed_count": len([r for r in results if not r.get("error")]),
"failed_count": len([r for r in results if r.get("error")]),
"results": results
})
}from aws_lambda_powertools.utilities.feature_flags import (
FeatureFlags,
AppConfigStore,
ConfigurationStoreError
)
from aws_lambda_powertools.utilities.idempotency import (
idempotent,
DynamoDBPersistenceLayer,
IdempotencyConfig,
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
IdempotencyKeyError
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger, Metrics
from aws_lambda_powertools.metrics import MetricUnit
logger = Logger()
metrics = Metrics()
# Feature flags with error handling
try:
store = AppConfigStore(
environment="production",
application="my-app",
name="feature-flags",
max_age=30 # Refresh every 30 seconds
)
feature_flags = FeatureFlags(store=store, logger=logger)
except ConfigurationStoreError as e:
logger.error("Failed to initialize feature flags", extra={"error": str(e)})
# Fallback to default configuration
feature_flags = None
# Idempotency with error handling
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(
event_key_jmespath="requestId",
expires_after_seconds=3600,
use_local_cache=True
)
@metrics.log_metrics(capture_cold_start_metric=True)
@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
"""Handler with comprehensive error handling"""
request_id = event.get("requestId")
logger.append_keys(request_id=request_id)
try:
# Feature flag evaluation with fallbacks
features = get_enabled_features(event)
# Record feature usage metrics
for feature_name, enabled in features.items():
metrics.add_metric(
name=f"FeatureFlag_{feature_name}",
unit=MetricUnit.Count,
value=1 if enabled else 0
)
# Process request based on enabled features
result = process_request(event, features)
# Record success metrics
metrics.add_metric(name="RequestProcessed", unit=MetricUnit.Count, value=1)
return {
"statusCode": 200,
"body": result
}
except IdempotencyAlreadyInProgressError:
# Request is already being processed by another instance
logger.warning("Request already in progress")
metrics.add_metric(name="IdempotencyInProgress", unit=MetricUnit.Count, value=1)
return {
"statusCode": 409,
"body": {"error": "Request already in progress"}
}
except IdempotencyKeyError as e:
# Missing or invalid idempotency key
logger.error("Invalid idempotency key", extra={"error": str(e)})
metrics.add_metric(name="IdempotencyKeyError", unit=MetricUnit.Count, value=1)
return {
"statusCode": 400,
"body": {"error": "Invalid or missing idempotency key"}
}
except Exception as e:
# Unexpected error
logger.exception("Request processing failed")
metrics.add_metric(name="RequestFailed", unit=MetricUnit.Count, value=1)
return {
"statusCode": 500,
"body": {"error": "Internal server error"}
}
def get_enabled_features(event: dict) -> dict:
"""Get enabled features with fallback logic"""
features = {
"enhanced_processing": False,
"premium_features": False,
"beta_algorithms": False
}
if feature_flags is None:
logger.warning("Feature flags unavailable, using defaults")
return features
try:
user_context = {
"userId": event.get("userId"),
"userTier": event.get("userTier", "basic"),
"region": event.get("region", "us-east-1")
}
# Evaluate each feature flag with error handling
for feature_name in features:
try:
features[feature_name] = feature_flags.evaluate(
name=feature_name,
context=user_context,
default=features[feature_name]
)
except Exception as e:
logger.warning(
f"Failed to evaluate feature flag {feature_name}",
extra={"error": str(e)}
)
# Keep default value
return features
except ConfigurationStoreError as e:
logger.warning("Feature flag evaluation failed, using defaults", extra={"error": str(e)})
metrics.add_metric(name="FeatureFlagError", unit=MetricUnit.Count, value=1)
return features
def process_request(event: dict, features: dict) -> dict:
"""Process request based on enabled features"""
result = {"processed_features": []}
if features.get("enhanced_processing", False):
logger.info("Using enhanced processing")
result["enhanced_data"] = enhanced_processing(event)
result["processed_features"].append("enhanced_processing")
if features.get("premium_features", False):
logger.info("Including premium features")
result["premium_data"] = get_premium_data(event)
result["processed_features"].append("premium_features")
if features.get("beta_algorithms", False):
logger.info("Using beta algorithms")
result["beta_results"] = beta_algorithm_processing(event)
result["processed_features"].append("beta_algorithms")
# Base processing
result["base_data"] = base_processing(event)
return result
def enhanced_processing(event: dict) -> dict:
"""Enhanced processing implementation"""
return {"type": "enhanced", "quality": "high"}
def get_premium_data(event: dict) -> dict:
"""Premium feature data"""
return {"premium_insights": True, "advanced_analytics": True}
def beta_algorithm_processing(event: dict) -> dict:
"""Beta algorithm processing"""
return {"algorithm": "beta_v2", "accuracy": "experimental"}
def base_processing(event: dict) -> dict:
"""Base processing implementation"""
return {"type": "standard", "quality": "normal"}from typing import Dict, Any, List, Union, Optional, Callable
from aws_lambda_powertools.utilities.typing import LambdaContext
# Feature flag types
FeatureFlagValue = Union[bool, str, int, float, Dict[str, Any], List[Any]]
FeatureFlagContext = Dict[str, Any]
FeatureFlagEvaluation = Dict[str, FeatureFlagValue]
# Rule action constants
RuleAction = Literal["ALLOW", "DENY"]
# Store provider types
class StoreProvider:
"""Base interface for feature flag store providers"""
pass
# Configuration store error
class ConfigurationStoreError(Exception):
"""Raised when configuration store operations fail"""
pass
# Schema validation error
class SchemaValidationError(Exception):
"""Raised when feature flag schema validation fails"""
pass
# Idempotency types
IdempotencyKey = str
IdempotencyRecord = Dict[str, Any]
# Idempotency exceptions
class IdempotencyError(Exception):
"""Base idempotency error"""
pass
class IdempotencyKeyError(IdempotencyError):
"""Raised when idempotency key is missing or invalid"""
pass
class IdempotencyAlreadyInProgressError(IdempotencyError):
"""Raised when request is already being processed"""
pass
class IdempotencyInconsistentStateError(IdempotencyError):
"""Raised when idempotency state is inconsistent"""
pass
class IdempotencyValidationError(IdempotencyError):
"""Raised when payload validation fails"""
pass
# Persistence layer types
PersistenceRecord = Dict[str, Any]
# Hook function type
IdempotencyHook = Callable[[str, Any, Optional[Exception]], None]
# Hash function type
HashFunction = Literal["md5", "sha1", "sha256", "sha512"]
# JMESPath expression type
JMESPathExpression = str
# Boto3 configuration types
Boto3Config = Dict[str, Any]
Boto3Session = Any # boto3.SessionInstall with Tessl CLI
npx tessl i tessl/pypi-aws-lambda-powertoolsdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10