CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aws-lambda-powertools

Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python

89

1.21x
Overview
Eval results
Files

feature-flags.mddocs/

Feature Flags & Idempotency

Feature flags with rule engine support from AWS AppConfig and idempotency patterns to prevent duplicate processing of events. Enables dynamic feature control and reliable event processing in serverless applications.

Capabilities

Feature Flags

Dynamic feature flag management with rule-based evaluation from AWS AppConfig.

class FeatureFlags:
    def __init__(
        self,
        store: StoreProvider,
        logger: Logger = None,
    ):
        """
        Initialize feature flags with store provider.
        
        Parameters:
        - store: Store provider (usually AppConfigStore)
        - logger: Optional logger for feature flag events
        """

    def evaluate(
        self,
        name: str,
        context: Dict[str, Any] = None,
        default: Any = False,
    ) -> bool | Any:
        """
        Evaluate feature flag with optional context.
        
        Parameters:
        - name: Feature flag name
        - context: Evaluation context (user ID, region, etc.)
        - default: Default value if flag not found
        
        Returns:
        Feature flag value (boolean or complex value)
        """

    def get_enabled_features(self, context: Dict[str, Any] = None) -> Dict[str, Any]:
        """
        Get all enabled features for given context.
        
        Parameters:
        - context: Evaluation context
        
        Returns:
        Dictionary of enabled feature names to values
        """

    def batch_evaluate(
        self,
        flags: List[str],
        context: Dict[str, Any] = None,
        default: Any = False,
    ) -> Dict[str, Any]:
        """
        Evaluate multiple feature flags in batch.
        
        Parameters:
        - flags: List of feature flag names
        - context: Evaluation context
        - default: Default value for missing flags
        
        Returns:
        Dictionary mapping flag names to values
        """

class AppConfigStore(StoreProvider):
    def __init__(
        self,
        environment: str,
        application: str,
        name: str,
        max_age: int = 5,
        sdk_config: Dict[str, Any] = None,
        envelope: str = None,
        jmespath_options: Dict[str, Any] = None,
        logger: Logger = None,
    ):
        """
        AWS AppConfig store provider for feature flags.
        
        Parameters:
        - environment: AppConfig environment name
        - application: AppConfig application name
        - name: AppConfig configuration profile name
        - max_age: Cache TTL in seconds
        - sdk_config: Boto3 client configuration
        - envelope: JMESPath envelope for data extraction
        - jmespath_options: JMESPath evaluation options
        - logger: Optional logger instance
        """

    def get_configuration(self) -> Dict[str, Any]:
        """
        Get configuration from AppConfig.
        
        Returns:
        Complete configuration dictionary
        """

    def _get_flag_value(
        self,
        name: str,
        context: Dict[str, Any] = None,
        default: Any = None,
    ) -> Any:
        """Get individual flag value with rule evaluation"""

class StoreProvider:
    """Base store provider interface for feature flags"""
    
    def get_configuration(self) -> Dict[str, Any]:
        """Get complete configuration"""
        raise NotImplementedError
    
    def get(
        self,
        name: str,
        context: Dict[str, Any] = None,
        default: Any = None,
    ) -> Any:
        """Get configuration value by name"""
        raise NotImplementedError

Rule Engine

Rule-based feature flag evaluation with advanced targeting capabilities.

class RuleAction:
    ALLOW = "ALLOW"
    DENY = "DENY"

class SchemaValidator:
    def __init__(self, schema: Dict[str, Any]):
        """
        Initialize schema validator for feature flag configuration.
        
        Parameters:
        - schema: JSON schema for configuration validation
        """

    def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Validate configuration data against schema.
        
        Parameters:
        - data: Configuration data to validate
        
        Returns:
        Validated configuration data
        
        Raises:
        SchemaValidationError: If validation fails
        """

Idempotency

Idempotency patterns to ensure Lambda functions process events exactly once.

def idempotent(
    persistence_store: BasePersistenceLayer,
    config: IdempotencyConfig = None,
) -> Callable:
    """
    Decorator for making Lambda handler idempotent.
    
    Parameters:
    - persistence_store: Storage layer for idempotency keys
    - config: Idempotency configuration options
    
    Returns:
    Decorated function that prevents duplicate processing
    """

def idempotent_function(
    data_keyword_argument: str,
    persistence_store: BasePersistenceLayer,
    config: IdempotencyConfig = None,
) -> Callable:
    """
    Decorator for making specific function calls idempotent.
    
    Parameters:
    - data_keyword_argument: Keyword argument name containing data for key generation
    - persistence_store: Storage layer for idempotency keys
    - config: Idempotency configuration options
    
    Returns:
    Decorated function with idempotency guarantees
    """

class IdempotencyConfig:
    def __init__(
        self,
        event_key_jmespath: str = None,
        payload_validation_jmespath: str = None,
        raise_on_no_idempotency_key: bool = False,
        expires_after_seconds: int = 3600,
        use_local_cache: bool = False,
        local_cache_max_items: int = 256,
        hash_function: str = "md5",
        lambda_context: LambdaContext = None,
    ):
        """
        Configuration for idempotency behavior.
        
        Parameters:
        - event_key_jmespath: JMESPath to extract idempotency key from event
        - payload_validation_jmespath: JMESPath to extract payload for validation
        - raise_on_no_idempotency_key: Whether to raise error if no key found
        - expires_after_seconds: TTL for idempotency records
        - use_local_cache: Whether to use in-memory cache
        - local_cache_max_items: Maximum items in local cache
        - hash_function: Hash function for key generation (md5, sha1, sha256)
        - lambda_context: Lambda context for additional key generation
        """

class BasePersistenceLayer:
    """Base persistence layer for idempotency records"""
    
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        expiry_attr: str = "expiration",
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
    ):
        """
        Initialize persistence layer.
        
        Parameters:
        - table_name: Storage table/collection name
        - key_attr: Attribute name for idempotency key
        - expiry_attr: Attribute name for expiration timestamp
        - status_attr: Attribute name for processing status
        - data_attr: Attribute name for stored result
        - validation_key_attr: Attribute name for payload validation
        """

    def save_inprogress(
        self,
        idempotency_key: str,
        remaining_time_in_millis: int = None,
    ) -> None:
        """
        Save in-progress idempotency record.
        
        Parameters:
        - idempotency_key: Unique idempotency key
        - remaining_time_in_millis: Remaining Lambda execution time
        """

    def save_success(
        self,
        idempotency_key: str,
        result: Any,
        remaining_time_in_millis: int = None,
    ) -> None:
        """
        Save successful processing result.
        
        Parameters:
        - idempotency_key: Unique idempotency key  
        - result: Function result to store
        - remaining_time_in_millis: Remaining Lambda execution time
        """

    def get_record(self, idempotency_key: str) -> Dict[str, Any] | None:
        """
        Retrieve idempotency record by key.
        
        Parameters:
        - idempotency_key: Idempotency key to lookup
        
        Returns:
        Stored record or None if not found
        """

    def delete_record(self, idempotency_key: str) -> None:
        """
        Delete idempotency record.
        
        Parameters:
        - idempotency_key: Key of record to delete
        """

class DynamoDBPersistenceLayer(BasePersistenceLayer):
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        expiry_attr: str = "expiration", 
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
        boto_config: Dict[str, Any] = None,
        boto3_session: boto3.Session = None,
    ):
        """
        DynamoDB persistence layer for idempotency.
        
        Parameters:
        - table_name: DynamoDB table name
        - key_attr: Primary key attribute name
        - expiry_attr: TTL attribute name
        - status_attr: Status attribute name
        - data_attr: Data attribute name
        - validation_key_attr: Validation key attribute name
        - boto_config: Boto3 client configuration
        - boto3_session: Boto3 session for authentication
        """

class IdempotentHookFunction:
    """Hook function interface for idempotency events"""
    
    def __call__(
        self,
        idempotency_key: str,
        result: Any = None,
        exception: Exception = None,
    ) -> None:
        """
        Hook function called during idempotency processing.
        
        Parameters:
        - idempotency_key: The idempotency key
        - result: Function result (if successful)
        - exception: Exception (if failed)
        """

Usage Examples

Basic Feature Flags

from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore
from aws_lambda_powertools.utilities.typing import LambdaContext
import os

# Initialize feature flags with AppConfig
store = AppConfigStore(
    environment=os.environ["ENVIRONMENT"],
    application=os.environ["APP_NAME"],
    name="feature-flags"
)

feature_flags = FeatureFlags(store=store)

def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Simple boolean flag
    if feature_flags.evaluate(name="enable_new_algorithm", default=False):
        result = new_algorithm_processing(event)
    else:
        result = legacy_algorithm_processing(event)
    
    # Feature flag with context
    user_id = event.get("user_id")
    user_context = {"userId": user_id, "region": event.get("region", "us-east-1")}
    
    if feature_flags.evaluate(name="enable_premium_features", context=user_context, default=False):
        result["premium_data"] = get_premium_data(user_id)
    
    # Complex feature flag with configuration
    recommendation_config = feature_flags.evaluate(
        name="recommendation_settings",
        context=user_context,
        default={"enabled": False, "algorithm": "basic", "limit": 5}
    )
    
    if recommendation_config.get("enabled", False):
        recommendations = get_recommendations(
            user_id=user_id,
            algorithm=recommendation_config.get("algorithm", "basic"),
            limit=recommendation_config.get("limit", 5)
        )
        result["recommendations"] = recommendations
    
    return {
        "statusCode": 200,
        "body": result
    }

def new_algorithm_processing(event: dict) -> dict:
    """New algorithm implementation"""
    return {"algorithm": "v2", "result": "processed_with_new_logic"}

def legacy_algorithm_processing(event: dict) -> dict:
    """Legacy algorithm implementation"""
    return {"algorithm": "v1", "result": "processed_with_legacy_logic"}

Advanced Feature Flags with Rules

from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger
import os

logger = Logger()

# AppConfig feature flag configuration example:
# {
#   "feature_flags": {
#     "enable_beta_features": {
#       "enabled": true,
#       "rules": {
#         "allow_beta_users": {
#           "when_match": [
#             {
#               "action": "ALLOW",
#               "conditions": [
#                 {
#                   "key": "userType",
#                   "value": "beta",
#                   "condition": "EQUALS"
#                 }
#               ]
#             }
#           ],
#           "when_no_match": {
#             "action": "DENY"
#           }
#         }
#       },
#       "default": false
#     },
#     "api_rate_limit": {
#       "enabled": true,
#       "rules": {
#         "premium_rate_limit": {
#           "when_match": [
#             {
#               "action": "ALLOW",
#               "conditions": [
#                 {
#                   "key": "userTier", 
#                   "value": "premium",
#                   "condition": "EQUALS"
#                 }
#               ]
#             }
#           ],
#           "when_no_match": {
#             "action": "DENY"
#           }
#         }
#       },
#       "default": {"requests_per_minute": 100}
#     }
#   }
# }

store = AppConfigStore(
    environment=os.environ["ENVIRONMENT"],
    application="my-app",
    name="feature-config"
)

feature_flags = FeatureFlags(store=store, logger=logger)

def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Extract user context from event
    user_context = {
        "userId": event.get("user_id"),
        "userType": event.get("user_type", "standard"),
        "userTier": event.get("user_tier", "basic"),
        "region": event.get("region", "us-east-1"),
        "deviceType": event.get("device_type", "web")
    }
    
    logger.append_keys(user_context=user_context)
    
    # Evaluate feature flags with context
    results = {}
    
    # Beta features access
    beta_enabled = feature_flags.evaluate(
        name="enable_beta_features",
        context=user_context,
        default=False
    )
    
    if beta_enabled:
        logger.info("Beta features enabled for user")
        results["beta_features"] = get_beta_features(user_context)
    else:
        logger.info("Beta features not available for user")
    
    # Dynamic rate limiting
    rate_limit_config = feature_flags.evaluate(
        name="api_rate_limit",
        context=user_context,
        default={"requests_per_minute": 60}
    )
    
    # Apply rate limiting
    rate_limit = rate_limit_config.get("requests_per_minute", 60)
    if not check_rate_limit(user_context["userId"], rate_limit):
        return {
            "statusCode": 429,
            "body": {"error": "Rate limit exceeded"}
        }
    
    # Batch evaluate multiple flags
    flag_results = feature_flags.batch_evaluate(
        flags=["enable_analytics", "enable_notifications", "enable_cache"],
        context=user_context,
        default=False
    )
    
    # Process based on enabled features
    if flag_results.get("enable_analytics", False):
        track_user_event(event, user_context)
    
    if flag_results.get("enable_notifications", False):
        queue_notification(user_context["userId"], "feature_accessed")
    
    # Use caching if enabled
    cache_enabled = flag_results.get("enable_cache", False)
    if cache_enabled:
        results["cached_data"] = get_cached_data(user_context["userId"])
    else:
        results["fresh_data"] = get_fresh_data(user_context["userId"])
    
    return {
        "statusCode": 200,
        "body": results,
        "rate_limit": rate_limit
    }

def get_beta_features(user_context: dict) -> dict:
    """Get beta features for user"""
    return {
        "new_dashboard": True,
        "advanced_analytics": True,
        "ai_recommendations": True
    }

def check_rate_limit(user_id: str, limit: int) -> bool:
    """Check if user is within rate limit"""
    # Implementation would check against cache/database
    return True  # Simplified for example

def track_user_event(event: dict, user_context: dict):
    """Track analytics event"""
    logger.info("Tracking user event", extra={"event_type": "api_access"})

def queue_notification(user_id: str, notification_type: str):
    """Queue notification for user"""
    logger.info("Queuing notification", extra={
        "user_id": user_id,
        "type": notification_type
    })

Basic Idempotency

from aws_lambda_powertools.utilities.idempotency import (
    idempotent, 
    DynamoDBPersistenceLayer,
    IdempotencyConfig
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger
import json

logger = Logger()

# Configure DynamoDB persistence layer
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

# Configure idempotency behavior
config = IdempotencyConfig(
    event_key_jmespath="requestId",  # Extract requestId from event
    expires_after_seconds=3600,      # 1 hour TTL
    use_local_cache=True,           # Enable local caching
)

@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    """
    Idempotent Lambda handler - duplicate requests return cached result
    """
    
    request_id = event.get("requestId")
    logger.info(f"Processing request {request_id}")
    
    # This logic will only execute once per unique requestId
    user_id = event.get("userId")
    order_data = event.get("orderData", {})
    
    # Simulate expensive operation
    order_id = process_order(user_id, order_data)
    
    # Send confirmation (will only happen once)
    send_order_confirmation(user_id, order_id)
    
    result = {
        "orderId": order_id,
        "status": "processed",
        "timestamp": context.aws_request_id
    }
    
    logger.info(f"Order processed: {order_id}")
    
    return {
        "statusCode": 200,
        "body": json.dumps(result)
    }

def process_order(user_id: str, order_data: dict) -> str:
    """Process order (expensive operation)"""
    import uuid
    import time
    
    # Simulate processing time
    time.sleep(1)
    
    order_id = str(uuid.uuid4())
    logger.info(f"Created order {order_id} for user {user_id}")
    
    return order_id

def send_order_confirmation(user_id: str, order_id: str):
    """Send order confirmation (side effect)"""
    logger.info(f"Sending confirmation for order {order_id} to user {user_id}")
    # Email/SMS sending logic here

Advanced Idempotency with Custom Keys

from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
    idempotent_function,
    DynamoDBPersistenceLayer,
    IdempotencyConfig
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger
import hashlib
import json

logger = Logger()

# Persistence layer with custom configuration
persistence_layer = DynamoDBPersistenceLayer(
    table_name="IdempotencyTable",
    key_attr="idempotency_key",
    expiry_attr="expires_at",
    status_attr="status",
    data_attr="result_data"
)

# Complex idempotency configuration
config = IdempotencyConfig(
    event_key_jmespath="headers.`Idempotency-Key` || body.transactionId",
    payload_validation_jmespath="body",  # Validate entire body for changes
    expires_after_seconds=86400,  # 24 hours
    use_local_cache=True,
    hash_function="sha256",
    raise_on_no_idempotency_key=True
)

@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    """Handler with custom idempotency key extraction"""
    
    # Process payment request
    payment_data = event.get("body", {})
    headers = event.get("headers", {})
    
    idempotency_key = headers.get("Idempotency-Key") or payment_data.get("transactionId")
    logger.append_keys(idempotency_key=idempotency_key)
    
    logger.info("Processing payment request")
    
    # This will only execute once per unique key
    result = process_payment_request(payment_data)
    
    return {
        "statusCode": 200,
        "body": json.dumps(result)
    }

# Separate persistence layer for user operations
user_persistence = DynamoDBPersistenceLayer(table_name="UserIdempotencyTable")

user_config = IdempotencyConfig(
    expires_after_seconds=3600,
    use_local_cache=True
)

@idempotent_function(
    data_keyword_argument="user_data",
    persistence_store=user_persistence,
    config=user_config
)
def create_user(user_data: dict) -> dict:
    """Idempotent user creation function"""
    
    logger.info("Creating new user", extra={"email": user_data.get("email")})
    
    # Check if user already exists
    if user_exists(user_data["email"]):
        raise ValueError("User already exists")
    
    # Create user (expensive operation)
    user_id = generate_user_id()
    save_user_to_database(user_id, user_data)
    send_welcome_email(user_data["email"])
    
    return {
        "user_id": user_id,
        "email": user_data["email"],
        "created_at": "2024-01-01T00:00:00Z"
    }

def process_payment_request(payment_data: dict) -> dict:
    """Process payment with external API"""
    
    amount = payment_data["amount"]
    currency = payment_data["currency"]
    payment_method = payment_data["payment_method"]
    
    # Call external payment processor
    transaction_id = call_payment_processor(amount, currency, payment_method)
    
    # Update internal records
    update_payment_records(transaction_id, payment_data)
    
    return {
        "transaction_id": transaction_id,
        "amount": amount,
        "currency": currency,
        "status": "completed"
    }

# Bulk operations with per-item idempotency
@idempotent_function(
    data_keyword_argument="item_data",
    persistence_store=persistence_layer,
    config=IdempotencyConfig(expires_after_seconds=1800)
)
def process_bulk_item(item_data: dict) -> dict:
    """Process individual item in bulk operation"""
    
    item_id = item_data["item_id"]
    logger.info(f"Processing item {item_id}")
    
    # Expensive per-item processing
    result = expensive_item_processing(item_data)
    
    return {
        "item_id": item_id,
        "processed": True,
        "result": result
    }

def bulk_handler(event: dict, context: LambdaContext) -> dict:
    """Handle bulk operations with per-item idempotency"""
    
    items = event.get("items", [])
    results = []
    
    for item in items:
        try:
            # Each item is processed idempotently
            result = process_bulk_item(item_data=item)
            results.append(result)
            
        except Exception as e:
            logger.exception(f"Failed to process item {item.get('item_id')}")
            results.append({
                "item_id": item.get("item_id"),
                "error": str(e)
            })
    
    return {
        "statusCode": 200,
        "body": json.dumps({
            "processed_count": len([r for r in results if not r.get("error")]),
            "failed_count": len([r for r in results if r.get("error")]),
            "results": results
        })
    }

Error Handling and Monitoring

from aws_lambda_powertools.utilities.feature_flags import (
    FeatureFlags, 
    AppConfigStore,
    ConfigurationStoreError
)
from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    IdempotencyAlreadyInProgressError,
    IdempotencyInconsistentStateError,
    IdempotencyKeyError
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger, Metrics
from aws_lambda_powertools.metrics import MetricUnit

logger = Logger()
metrics = Metrics()

# Feature flags with error handling
try:
    store = AppConfigStore(
        environment="production",
        application="my-app", 
        name="feature-flags",
        max_age=30  # Refresh every 30 seconds
    )
    feature_flags = FeatureFlags(store=store, logger=logger)
except ConfigurationStoreError as e:
    logger.error("Failed to initialize feature flags", extra={"error": str(e)})
    # Fallback to default configuration
    feature_flags = None

# Idempotency with error handling
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

config = IdempotencyConfig(
    event_key_jmespath="requestId", 
    expires_after_seconds=3600,
    use_local_cache=True
)

@metrics.log_metrics(capture_cold_start_metric=True)
@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    """Handler with comprehensive error handling"""
    
    request_id = event.get("requestId")
    logger.append_keys(request_id=request_id)
    
    try:
        # Feature flag evaluation with fallbacks
        features = get_enabled_features(event)
        
        # Record feature usage metrics
        for feature_name, enabled in features.items():
            metrics.add_metric(
                name=f"FeatureFlag_{feature_name}",
                unit=MetricUnit.Count,
                value=1 if enabled else 0
            )
        
        # Process request based on enabled features
        result = process_request(event, features)
        
        # Record success metrics
        metrics.add_metric(name="RequestProcessed", unit=MetricUnit.Count, value=1)
        
        return {
            "statusCode": 200,
            "body": result
        }
        
    except IdempotencyAlreadyInProgressError:
        # Request is already being processed by another instance
        logger.warning("Request already in progress")
        metrics.add_metric(name="IdempotencyInProgress", unit=MetricUnit.Count, value=1)
        
        return {
            "statusCode": 409,
            "body": {"error": "Request already in progress"}
        }
        
    except IdempotencyKeyError as e:
        # Missing or invalid idempotency key
        logger.error("Invalid idempotency key", extra={"error": str(e)})
        metrics.add_metric(name="IdempotencyKeyError", unit=MetricUnit.Count, value=1)
        
        return {
            "statusCode": 400,
            "body": {"error": "Invalid or missing idempotency key"}
        }
        
    except Exception as e:
        # Unexpected error
        logger.exception("Request processing failed")
        metrics.add_metric(name="RequestFailed", unit=MetricUnit.Count, value=1)
        
        return {
            "statusCode": 500,
            "body": {"error": "Internal server error"}
        }

def get_enabled_features(event: dict) -> dict:
    """Get enabled features with fallback logic"""
    
    features = {
        "enhanced_processing": False,
        "premium_features": False,
        "beta_algorithms": False
    }
    
    if feature_flags is None:
        logger.warning("Feature flags unavailable, using defaults")
        return features
    
    try:
        user_context = {
            "userId": event.get("userId"),
            "userTier": event.get("userTier", "basic"),
            "region": event.get("region", "us-east-1")
        }
        
        # Evaluate each feature flag with error handling
        for feature_name in features:
            try:
                features[feature_name] = feature_flags.evaluate(
                    name=feature_name,
                    context=user_context,
                    default=features[feature_name]
                )
            except Exception as e:
                logger.warning(
                    f"Failed to evaluate feature flag {feature_name}",
                    extra={"error": str(e)}
                )
                # Keep default value
        
        return features
        
    except ConfigurationStoreError as e:
        logger.warning("Feature flag evaluation failed, using defaults", extra={"error": str(e)})
        metrics.add_metric(name="FeatureFlagError", unit=MetricUnit.Count, value=1)
        return features

def process_request(event: dict, features: dict) -> dict:
    """Process request based on enabled features"""
    
    result = {"processed_features": []}
    
    if features.get("enhanced_processing", False):
        logger.info("Using enhanced processing")
        result["enhanced_data"] = enhanced_processing(event)
        result["processed_features"].append("enhanced_processing")
    
    if features.get("premium_features", False):
        logger.info("Including premium features")
        result["premium_data"] = get_premium_data(event)
        result["processed_features"].append("premium_features")
    
    if features.get("beta_algorithms", False):
        logger.info("Using beta algorithms")
        result["beta_results"] = beta_algorithm_processing(event)
        result["processed_features"].append("beta_algorithms")
    
    # Base processing
    result["base_data"] = base_processing(event)
    
    return result

def enhanced_processing(event: dict) -> dict:
    """Enhanced processing implementation"""
    return {"type": "enhanced", "quality": "high"}

def get_premium_data(event: dict) -> dict:
    """Premium feature data"""
    return {"premium_insights": True, "advanced_analytics": True}

def beta_algorithm_processing(event: dict) -> dict:
    """Beta algorithm processing"""
    return {"algorithm": "beta_v2", "accuracy": "experimental"}

def base_processing(event: dict) -> dict:
    """Base processing implementation"""
    return {"type": "standard", "quality": "normal"}

Types

from typing import Dict, Any, List, Union, Optional, Callable
from aws_lambda_powertools.utilities.typing import LambdaContext

# Feature flag types
FeatureFlagValue = Union[bool, str, int, float, Dict[str, Any], List[Any]]
FeatureFlagContext = Dict[str, Any]
FeatureFlagEvaluation = Dict[str, FeatureFlagValue]

# Rule action constants
RuleAction = Literal["ALLOW", "DENY"]

# Store provider types
class StoreProvider:
    """Base interface for feature flag store providers"""
    pass

# Configuration store error
class ConfigurationStoreError(Exception):
    """Raised when configuration store operations fail"""
    pass

# Schema validation error  
class SchemaValidationError(Exception):
    """Raised when feature flag schema validation fails"""
    pass

# Idempotency types
IdempotencyKey = str
IdempotencyRecord = Dict[str, Any]

# Idempotency exceptions
class IdempotencyError(Exception):
    """Base idempotency error"""
    pass

class IdempotencyKeyError(IdempotencyError):
    """Raised when idempotency key is missing or invalid"""
    pass

class IdempotencyAlreadyInProgressError(IdempotencyError):
    """Raised when request is already being processed"""
    pass

class IdempotencyInconsistentStateError(IdempotencyError):
    """Raised when idempotency state is inconsistent"""
    pass

class IdempotencyValidationError(IdempotencyError):
    """Raised when payload validation fails"""
    pass

# Persistence layer types
PersistenceRecord = Dict[str, Any]

# Hook function type
IdempotencyHook = Callable[[str, Any, Optional[Exception]], None]

# Hash function type
HashFunction = Literal["md5", "sha1", "sha256", "sha512"]

# JMESPath expression type
JMESPathExpression = str

# Boto3 configuration types
Boto3Config = Dict[str, Any]
Boto3Session = Any  # boto3.Session

Install with Tessl CLI

npx tessl i tessl/pypi-aws-lambda-powertools

docs

batch-processing.md

core-observability.md

data-classes.md

event-handlers.md

feature-flags.md

index.md

parameters.md

parser.md

utilities.md

tile.json