CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aws-lambda-powertools

Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python

89

1.21x
Overview
Eval results
Files

utilities.mddocs/

Utilities

Additional utilities including data masking for sensitive information, streaming for large S3 objects, serialization helpers, validation, JMESPath operations, Kafka consumer helpers, and middleware factory for Lambda decorators.

Capabilities

Data Masking

Utilities for masking sensitive data in logs, responses, and storage with pluggable providers.

class DataMasking:
    def __init__(self, provider: BaseProvider):
        """
        Initialize data masking utility.
        
        Parameters:
        - provider: Data masking provider (e.g., KMS-based masking)
        """

    def erase(
        self,
        data: Any,
        fields: List[str] = None,
    ) -> Any:
        """
        Erase sensitive fields from data structure.
        
        Parameters:
        - data: Data structure (dict, list, or primitive)
        - fields: List of field paths to erase (uses JMESPath syntax)
        
        Returns:
        Data with specified fields erased/masked
        """

    def encrypt(
        self,
        data: Any,
        fields: List[str] = None,
        **provider_options,
    ) -> Any:
        """
        Encrypt sensitive fields in data structure.
        
        Parameters:
        - data: Data structure to process
        - fields: Field paths to encrypt
        - **provider_options: Provider-specific options
        
        Returns:
        Data with specified fields encrypted
        """

    def decrypt(
        self,
        data: Any,
        fields: List[str] = None,
        **provider_options,
    ) -> Any:
        """
        Decrypt encrypted fields in data structure.
        
        Parameters:
        - data: Data structure with encrypted fields
        - fields: Field paths to decrypt
        - **provider_options: Provider-specific options
        
        Returns:
        Data with specified fields decrypted
        """

class BaseProvider:
    """Base data masking provider interface"""
    
    def encrypt(self, data: str, **kwargs) -> str:
        """Encrypt string data"""
        raise NotImplementedError
    
    def decrypt(self, data: str, **kwargs) -> str:
        """Decrypt string data"""
        raise NotImplementedError
    
    def erase(self, data: str, **kwargs) -> str:
        """Erase/mask string data"""
        raise NotImplementedError

Streaming

Utilities for streaming large objects from S3 with transformation support.

class S3Object:
    def __init__(
        self,
        bucket: str,
        key: str,
        version_id: str = None,
        boto3_session: boto3.Session = None,
        **kwargs,
    ):
        """
        Initialize S3 object for streaming operations.
        
        Parameters:
        - bucket: S3 bucket name
        - key: S3 object key
        - version_id: Specific object version ID
        - boto3_session: Boto3 session for authentication
        - **kwargs: Additional S3 client parameters
        """

    def transform(self, transform: BaseTransform) -> "S3Object":
        """
        Apply transformation to object during streaming.
        
        Parameters:
        - transform: Transformation to apply
        
        Returns:
        New S3Object instance with transformation applied
        """

    def iter_lines(
        self,
        chunk_size: int = 1024,
        **kwargs,
    ) -> Iterator[str]:
        """
        Iterate over object lines.
        
        Parameters:
        - chunk_size: Size of chunks to read
        - **kwargs: Additional parameters
        
        Returns:
        Iterator yielding lines from the object
        """

    def iter_chunks(
        self,
        chunk_size: int = 1024,
        **kwargs,
    ) -> Iterator[bytes]:
        """
        Iterate over object chunks.
        
        Parameters:
        - chunk_size: Size of chunks to read
        - **kwargs: Additional parameters
        
        Returns:
        Iterator yielding byte chunks from the object
        """

    def read(self, size: int = -1) -> bytes:
        """
        Read object data.
        
        Parameters:
        - size: Number of bytes to read (-1 for all)
        
        Returns:
        Object data as bytes
        """

    def readline(self, size: int = -1) -> str:
        """
        Read single line from object.
        
        Parameters:
        - size: Maximum line length
        
        Returns:
        Single line as string
        """

    def write_to(
        self,
        destination_bucket: str,
        destination_key: str,
        **kwargs,
    ) -> Dict[str, Any]:
        """
        Write transformed object to destination S3 location.
        
        Parameters:
        - destination_bucket: Destination S3 bucket
        - destination_key: Destination S3 key
        - **kwargs: Additional S3 put parameters
        
        Returns:
        S3 put operation response
        """

class BaseTransform:
    """Base transformation interface for streaming objects"""
    
    def transform(self, data: bytes) -> bytes:
        """
        Transform byte data.
        
        Parameters:
        - data: Input data bytes
        
        Returns:
        Transformed data bytes
        """

class GzipTransform(BaseTransform):
    def __init__(self, compress: bool = True):
        """
        Gzip compression/decompression transform.
        
        Parameters:
        - compress: True to compress, False to decompress
        """

    def transform(self, data: bytes) -> bytes:
        """Apply gzip compression or decompression"""

class ZipTransform(BaseTransform):
    def __init__(
        self,
        compress: bool = True,
        compression_level: int = 6,
    ):
        """
        ZIP compression/decompression transform.
        
        Parameters:
        - compress: True to compress, False to decompress
        - compression_level: Compression level (0-9)
        """

    def transform(self, data: bytes) -> bytes:
        """Apply ZIP compression or decompression"""

class CsvTransform(BaseTransform):
    def __init__(
        self,
        delimiter: str = ",",
        quotechar: str = '"',
        headers: List[str] = None,
        **kwargs,
    ):
        """
        CSV parsing and generation transform.
        
        Parameters:
        - delimiter: Field delimiter
        - quotechar: Quote character
        - headers: Column headers for output
        - **kwargs: Additional CSV parameters
        """

    def transform(self, data: bytes) -> bytes:
        """Transform between CSV and JSON formats"""

Serialization

Utilities for common serialization tasks including Base64 encoding/decoding.

def base64_encode(data: Union[str, bytes], url_safe: bool = False) -> str:
    """
    Encode data as Base64 string.
    
    Parameters:
    - data: Data to encode (string or bytes)
    - url_safe: Whether to use URL-safe Base64 encoding
    
    Returns:
    Base64 encoded string
    """

def base64_decode(
    data: str,
    url_safe: bool = False,
    validate: bool = True,
) -> bytes:
    """
    Decode Base64 string to bytes.
    
    Parameters:
    - data: Base64 encoded string
    - url_safe: Whether string uses URL-safe Base64 encoding
    - validate: Whether to validate Base64 format
    
    Returns:
    Decoded bytes
    
    Raises:
    ValueError: If Base64 string is invalid and validate=True
    """

def base64_from_str(
    data: str,
    encoding: str = "utf-8",
    url_safe: bool = False,
) -> str:
    """
    Encode string as Base64.
    
    Parameters:
    - data: String to encode
    - encoding: String encoding to use
    - url_safe: Whether to use URL-safe Base64
    
    Returns:
    Base64 encoded string
    """

def base64_from_json(
    data: Any,
    ensure_ascii: bool = True,
    url_safe: bool = False,
) -> str:
    """
    Encode JSON data as Base64 string.
    
    Parameters:
    - data: Data to serialize as JSON then encode
    - ensure_ascii: Whether to ensure ASCII-only JSON
    - url_safe: Whether to use URL-safe Base64
    
    Returns:
    Base64 encoded JSON string
    """

Validation

Schema validation utilities for JSON data validation.

def validate(
    event: Dict[str, Any],
    schema: Dict[str, Any],
    envelope: str = None,
) -> Dict[str, Any]:
    """
    Validate event data against JSON schema.
    
    Parameters:
    - event: Event data to validate
    - schema: JSON schema for validation
    - envelope: JMESPath expression to extract data from event
    
    Returns:
    Validated event data
    
    Raises:
    SchemaValidationError: If validation fails
    InvalidSchemaFormatError: If schema format is invalid
    InvalidEnvelopeExpressionError: If envelope expression is invalid
    """

def validator(
    schema: Dict[str, Any],
    envelope: str = None,
) -> Callable:
    """
    Decorator for validating Lambda event data.
    
    Parameters:
    - schema: JSON schema for validation
    - envelope: JMESPath expression for data extraction
    
    Returns:
    Decorator function that validates event before handler execution
    """

class InvalidSchemaFormatError(Exception):
    """Raised when JSON schema format is invalid"""
    pass

class SchemaValidationError(Exception):
    """Raised when data validation against schema fails"""
    pass

class InvalidEnvelopeExpressionError(Exception):
    """Raised when JMESPath envelope expression is invalid"""
    pass

JMESPath Utils

Utilities for JMESPath operations on JSON data with custom functions.

def query(
    data: Dict[str, Any],
    expression: str,
    options: Dict[str, Any] = None,
) -> Any:
    """
    Execute JMESPath query on data.
    
    Parameters:
    - data: JSON data to query
    - expression: JMESPath expression
    - options: JMESPath options including custom functions
    
    Returns:
    Query result
    """

def extract_data_from_envelope(
    data: Dict[str, Any],
    envelope: str,
) -> Any:
    """
    Extract data using JMESPath envelope expression.
    
    Parameters:
    - data: Source data
    - envelope: JMESPath expression for data extraction
    
    Returns:
    Extracted data
    
    Note: This function is deprecated, use query() instead
    """

class PowertoolsFunctions:
    """Built-in JMESPath functions for common operations"""
    
    @staticmethod
    def powertools_json(value: str) -> Any:
        """Parse JSON string"""
    
    @staticmethod
    def powertools_base64(value: str) -> str:
        """Decode Base64 string"""
    
    @staticmethod
    def powertools_base64_gzip(value: str) -> str:
        """Decode Base64 and decompress gzip"""

Kafka Consumer

Utilities for processing Kafka messages with schema support and deserialization.

def kafka_consumer(
    record_handler: Callable[[Dict], Any],
    deserializer: BaseDeserializer = None,
) -> Callable:
    """
    Decorator for Kafka consumer Lambda functions.
    
    Parameters:
    - record_handler: Function to process individual Kafka records
    - deserializer: Deserializer for Kafka message values
    
    Returns:
    Decorated Lambda function that processes Kafka events
    """

class ConsumerRecords:
    """Kafka consumer records container"""
    
    def __init__(
        self,
        raw_event: Dict[str, Any],
        deserializer: BaseDeserializer = None,
    ):
        """
        Initialize consumer records.
        
        Parameters:
        - raw_event: Raw Kafka Lambda event
        - deserializer: Message deserializer
        """

    @property
    def records(self) -> List[KafkaRecord]:
        """Get list of Kafka records"""

    def __iter__(self) -> Iterator[KafkaRecord]:
        """Iterate over Kafka records"""

class SchemaConfig:
    def __init__(
        self,
        schema_registry_url: str,
        schema_name: str = None,
        schema_version: int = None,
        **kwargs,
    ):
        """
        Schema configuration for Kafka message deserialization.
        
        Parameters:
        - schema_registry_url: Confluent Schema Registry URL
        - schema_name: Schema name/subject
        - schema_version: Specific schema version
        - **kwargs: Additional schema registry client options
        """

class BaseDeserializer:
    """Base deserializer interface for Kafka messages"""
    
    def deserialize(self, data: bytes, **kwargs) -> Any:
        """
        Deserialize Kafka message data.
        
        Parameters:
        - data: Raw message bytes
        - **kwargs: Additional deserialization options
        
        Returns:
        Deserialized message data
        """

Lambda Context Type

Type definition for Lambda execution context.

class LambdaContext:
    """AWS Lambda execution context"""
    
    function_name: str
    function_version: str
    invoked_function_arn: str
    memory_limit_in_mb: int
    remaining_time_in_millis: int
    request_id: str
    log_group_name: str
    log_stream_name: str
    
    @property
    def identity(self) -> Any:
        """Cognito identity information (mobile apps)"""
    
    @property
    def client_context(self) -> Any:
        """Client context information (mobile apps)"""
    
    def get_remaining_time_in_millis(self) -> int:
        """Get remaining execution time in milliseconds"""

Middleware Factory

Factory for creating Lambda handler middleware decorators.

def lambda_handler_decorator(
    trace_execution: bool = False,
    clear_state: bool = False,
) -> Callable:
    """
    Factory for creating Lambda handler decorators.
    
    Parameters:
    - trace_execution: Whether to trace decorator execution
    - clear_state: Whether to clear state after execution
    
    Returns:
    Decorator factory function
    """

Usage Examples

Data Masking for Sensitive Information

from aws_lambda_powertools.utilities.data_masking import DataMasking, BaseProvider
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

class SimpleErasureProvider(BaseProvider):
    """Simple provider that erases sensitive data"""
    
    def erase(self, data: str, **kwargs) -> str:
        return "***MASKED***"
    
    def encrypt(self, data: str, **kwargs) -> str:
        # In real implementation, use proper encryption
        return f"ENCRYPTED:{data[:3]}***"
    
    def decrypt(self, data: str, **kwargs) -> str:
        # In real implementation, use proper decryption
        if data.startswith("ENCRYPTED:"):
            return data.replace("ENCRYPTED:", "").replace("***", "")
        return data

# Initialize data masking
masking = DataMasking(provider=SimpleErasureProvider())

def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Sample user data with sensitive information
    user_data = {
        "user_id": "12345",
        "name": "John Doe",
        "email": "john@example.com",
        "ssn": "123-45-6789",
        "credit_card": "4111-1111-1111-1111",
        "address": {
            "street": "123 Main St",
            "city": "Anytown",
            "zip": "12345"
        },
        "preferences": {
            "newsletter": True,
            "phone": "555-123-4567"
        }
    }
    
    # Erase sensitive fields for logging
    safe_logging_data = masking.erase(
        data=user_data,
        fields=[
            "ssn",
            "credit_card", 
            "preferences.phone"
        ]
    )
    
    print(f"Processing user data: {json.dumps(safe_logging_data, indent=2)}")
    
    # Encrypt sensitive data for storage
    encrypted_data = masking.encrypt(
        data=user_data,
        fields=["ssn", "credit_card"]
    )
    
    # Store encrypted data
    store_user_data(encrypted_data)
    
    # Return response with masked sensitive data
    response_data = masking.erase(
        data=user_data,
        fields=["ssn", "credit_card", "preferences.phone"]
    )
    
    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "User data processed",
            "user": response_data
        })
    }

def store_user_data(data: dict):
    """Store user data (mock implementation)"""
    print("Storing encrypted user data to database")

S3 Streaming with Transformations

from aws_lambda_powertools.utilities.streaming import S3Object
from aws_lambda_powertools.utilities.streaming.transformations import (
    GzipTransform,
    CsvTransform
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

def lambda_handler(event: dict, context: LambdaContext) -> dict:
    # Get S3 event information
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    
    print(f"Processing S3 object: s3://{bucket}/{key}")
    
    # Create S3 streaming object
    s3_object = S3Object(bucket=bucket, key=key)
    
    processed_records = 0
    
    if key.endswith('.gz'):
        # Handle compressed file
        processed_records = process_compressed_file(s3_object, bucket)
    elif key.endswith('.csv'):
        # Handle CSV file
        processed_records = process_csv_file(s3_object, bucket)
    else:
        # Handle regular text file
        processed_records = process_text_file(s3_object, bucket)
    
    return {
        "statusCode": 200,
        "body": json.dumps({
            "processed_records": processed_records,
            "source_bucket": bucket,
            "source_key": key
        })
    }

def process_compressed_file(s3_object: S3Object, bucket: str) -> int:
    """Process gzip compressed file"""
    
    # Add decompression transform
    decompressed_object = s3_object.transform(GzipTransform(compress=False))
    
    record_count = 0
    
    # Process line by line without loading entire file into memory
    for line in decompressed_object.iter_lines():
        if line.strip():  # Skip empty lines
            # Process individual line
            process_log_line(line)
            record_count += 1
    
    print(f"Processed {record_count} log entries from compressed file")
    return record_count

def process_csv_file(s3_object: S3Object, bucket: str) -> int:
    """Process CSV file and convert to JSON"""
    
    # Transform CSV to JSON format
    csv_transform = CsvTransform(
        delimiter=",",
        headers=["id", "name", "email", "created_at"]
    )
    
    json_object = s3_object.transform(csv_transform)
    
    # Write transformed data to new S3 location
    output_key = s3_object.key.replace('.csv', '.json')
    
    json_object.write_to(
        destination_bucket=bucket,
        destination_key=f"processed/{output_key}",
        ContentType="application/json"
    )
    
    # Count records by iterating through original CSV
    record_count = sum(1 for line in s3_object.iter_lines()) - 1  # Subtract header
    
    print(f"Converted {record_count} CSV records to JSON")
    return record_count

def process_text_file(s3_object: S3Object, bucket: str) -> int:
    """Process regular text file"""
    
    record_count = 0
    processed_lines = []
    
    # Process in chunks to handle large files
    for chunk in s3_object.iter_chunks(chunk_size=8192):
        # Process chunk data
        chunk_str = chunk.decode('utf-8', errors='ignore')
        lines = chunk_str.split('\n')
        
        for line in lines:
            if line.strip():
                processed_line = process_text_line(line)
                if processed_line:
                    processed_lines.append(processed_line)
                    record_count += 1
    
    # Write processed data back to S3
    if processed_lines:
        output_data = '\n'.join(processed_lines)
        
        # Use gzip compression for output
        compressed_object = S3Object(
            bucket=bucket,
            key="temp/processing_output.txt"
        ).transform(GzipTransform(compress=True))
        
        # This would require implementing write capability
        # compressed_object.write(output_data.encode('utf-8'))
    
    print(f"Processed {record_count} text lines")
    return record_count

def process_log_line(line: str) -> None:
    """Process individual log line"""
    try:
        # Parse log line (e.g., JSON logs)
        log_entry = json.loads(line)
        
        # Extract relevant information
        timestamp = log_entry.get("timestamp")
        level = log_entry.get("level")
        message = log_entry.get("message")
        
        # Process based on log level
        if level == "ERROR":
            handle_error_log(log_entry)
        elif level == "WARN":
            handle_warning_log(log_entry)
        
    except json.JSONDecodeError:
        # Handle non-JSON log lines
        print(f"Non-JSON log line: {line[:100]}...")

def process_text_line(line: str) -> str:
    """Process and transform text line"""
    # Example: uppercase and add timestamp
    import datetime
    timestamp = datetime.datetime.utcnow().isoformat()
    return f"[{timestamp}] {line.upper()}"

Schema Validation

from aws_lambda_powertools.utilities.validation import (
    validate,
    validator,
    SchemaValidationError
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

# JSON Schema for user registration
USER_REGISTRATION_SCHEMA = {
    "type": "object",
    "properties": {
        "firstName": {
            "type": "string",
            "minLength": 1,
            "maxLength": 50
        },
        "lastName": {
            "type": "string", 
            "minLength": 1,
            "maxLength": 50
        },
        "email": {
            "type": "string",
            "format": "email"
        },
        "age": {
            "type": "integer",
            "minimum": 13,
            "maximum": 120
        },
        "preferences": {
            "type": "object",
            "properties": {
                "newsletter": {"type": "boolean"},
                "notifications": {"type": "boolean"}
            },
            "additionalProperties": False
        }
    },
    "required": ["firstName", "lastName", "email", "age"],
    "additionalProperties": False
}

# Order schema
ORDER_SCHEMA = {
    "type": "object",
    "properties": {
        "orderId": {"type": "string"},
        "customerId": {"type": "string"},
        "items": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "productId": {"type": "string"},
                    "quantity": {"type": "integer", "minimum": 1},
                    "price": {"type": "number", "minimum": 0}
                },
                "required": ["productId", "quantity", "price"]
            },
            "minItems": 1
        },
        "totalAmount": {"type": "number", "minimum": 0}
    },
    "required": ["orderId", "customerId", "items", "totalAmount"]
}

@validator(schema=USER_REGISTRATION_SCHEMA, envelope="body")
def register_user_handler(event: dict, context: LambdaContext) -> dict:
    """Handler with automatic validation"""
    
    # Event body is automatically validated against schema
    user_data = event["body"]
    
    print(f"Registering user: {user_data['firstName']} {user_data['lastName']}")
    
    # Process validated user data
    user_id = create_user_account(user_data)
    
    return {
        "statusCode": 201,
        "body": json.dumps({
            "userId": user_id,
            "message": "User registered successfully"
        })
    }

def manual_validation_handler(event: dict, context: LambdaContext) -> dict:
    """Handler with manual validation"""
    
    try:
        # Manually validate event data
        validated_data = validate(
            event=event,
            schema=ORDER_SCHEMA,
            envelope="body"  # Extract from event.body
        )
        
        order_data = validated_data["body"]
        
        print(f"Processing order: {order_data['orderId']}")
        
        # Validate business rules after schema validation
        validate_business_rules(order_data)
        
        # Process order
        result = process_order(order_data)
        
        return {
            "statusCode": 200,
            "body": json.dumps(result)
        }
        
    except SchemaValidationError as e:
        print(f"Schema validation failed: {e}")
        return {
            "statusCode": 400,
            "body": json.dumps({
                "error": "Invalid request data",
                "details": str(e)
            })
        }
    except BusinessRuleError as e:
        print(f"Business rule validation failed: {e}")
        return {
            "statusCode": 422,
            "body": json.dumps({
                "error": "Business rule violation",
                "details": str(e)
            })
        }

def validate_business_rules(order_data: dict):
    """Additional business rule validation"""
    
    # Validate total amount matches item prices
    calculated_total = sum(
        item["quantity"] * item["price"]
        for item in order_data["items"]
    )
    
    if abs(order_data["totalAmount"] - calculated_total) > 0.01:
        raise BusinessRuleError("Total amount doesn't match item prices")
    
    # Validate order limits
    if order_data["totalAmount"] > 10000:
        raise BusinessRuleError("Order exceeds maximum amount limit")
    
    # Validate item availability
    for item in order_data["items"]:
        if not is_product_available(item["productId"], item["quantity"]):
            raise BusinessRuleError(f"Product {item['productId']} not available")

class BusinessRuleError(Exception):
    """Custom exception for business rule violations"""
    pass

def create_user_account(user_data: dict) -> str:
    """Create user account with validated data"""
    import uuid
    
    user_id = str(uuid.uuid4())
    
    # Save to database
    print(f"Creating user account: {user_id}")
    
    return user_id

def process_order(order_data: dict) -> dict:
    """Process validated order"""
    
    # Reserve inventory
    for item in order_data["items"]:
        reserve_inventory(item["productId"], item["quantity"])
    
    # Process payment
    payment_id = process_payment(order_data["totalAmount"])
    
    return {
        "orderId": order_data["orderId"],
        "paymentId": payment_id,
        "status": "confirmed"
    }

JMESPath Data Extraction

from aws_lambda_powertools.utilities.jmespath_utils import query, PowertoolsFunctions
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

def lambda_handler(event: dict, context: LambdaContext) -> dict:
    """Demonstrate JMESPath operations"""
    
    # Complex nested event data
    event_data = {
        "Records": [
            {
                "eventSource": "aws:s3",
                "s3": {
                    "bucket": {"name": "my-bucket"},
                    "object": {"key": "data/file1.json", "size": 1024}
                },
                "eventName": "ObjectCreated:Put"
            },
            {
                "eventSource": "aws:s3", 
                "s3": {
                    "bucket": {"name": "my-bucket"},
                    "object": {"key": "data/file2.json", "size": 2048}
                },
                "eventName": "ObjectCreated:Put"
            }
        ],
        "responsePayload": json.dumps({
            "users": [
                {"id": 1, "name": "Alice", "active": True},
                {"id": 2, "name": "Bob", "active": False}
            ]
        })
    }
    
    # Extract S3 bucket names
    bucket_names = query(
        data=event_data,
        expression="Records[*].s3.bucket.name"
    )
    print(f"Bucket names: {bucket_names}")
    
    # Extract object keys for created objects only
    created_objects = query(
        data=event_data,
        expression="Records[?eventName == 'ObjectCreated:Put'].s3.object.key"
    )
    print(f"Created objects: {created_objects}")
    
    # Calculate total size of all objects
    total_size = query(
        data=event_data,
        expression="sum(Records[*].s3.object.size)"
    )
    print(f"Total size: {total_size} bytes")
    
    # Extract and parse JSON payload using custom functions
    options = {"custom_functions": PowertoolsFunctions()}
    
    users_data = query(
        data=event_data,
        expression="powertools_json(responsePayload).users",
        options=options
    )
    print(f"Users data: {users_data}")
    
    # Get active users only
    active_users = query(
        data=event_data,
        expression="powertools_json(responsePayload).users[?active == `true`].name",
        options=options
    )
    print(f"Active users: {active_users}")
    
    # Complex filtering and transformation
    processed_records = query(
        data=event_data,
        expression="""Records[?eventSource == 'aws:s3'] | [].{
            bucket: s3.bucket.name,
            key: s3.object.key,
            size_mb: s3.object.size / `1024` / `1024`,
            is_large: s3.object.size > `1500`
        }"""
    )
    print(f"Processed records: {json.dumps(processed_records, indent=2)}")
    
    return {
        "statusCode": 200,
        "body": json.dumps({
            "bucket_names": bucket_names,
            "created_objects": created_objects, 
            "total_size": total_size,
            "active_users": active_users,
            "processed_records": processed_records
        })
    }

Serialization Utilities

from aws_lambda_powertools.utilities.serialization import (
    base64_encode,
    base64_decode, 
    base64_from_str,
    base64_from_json
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json

def lambda_handler(event: dict, context: LambdaContext) -> dict:
    """Demonstrate serialization utilities"""
    
    # Sample data to work with
    sample_data = {
        "user_id": "12345",
        "action": "purchase",
        "metadata": {
            "product_id": "prod-789",
            "amount": 99.99,
            "currency": "USD"
        }
    }
    
    # Encode JSON as Base64
    encoded_json = base64_from_json(sample_data)
    print(f"Encoded JSON: {encoded_json}")
    
    # Encode string as Base64
    message = "Hello, World!"
    encoded_string = base64_from_str(message)
    print(f"Encoded string: {encoded_string}")
    
    # Encode bytes as Base64
    binary_data = b"Binary data content"
    encoded_bytes = base64_encode(binary_data)
    print(f"Encoded bytes: {encoded_bytes}")
    
    # URL-safe Base64 encoding
    url_safe_encoded = base64_encode(binary_data, url_safe=True)
    print(f"URL-safe encoded: {url_safe_encoded}")
    
    # Decode Base64 back to original data
    decoded_json_bytes = base64_decode(encoded_json)
    decoded_json = json.loads(decoded_json_bytes.decode('utf-8'))
    print(f"Decoded JSON: {decoded_json}")
    
    decoded_string_bytes = base64_decode(encoded_string)
    decoded_string = decoded_string_bytes.decode('utf-8')
    print(f"Decoded string: {decoded_string}")
    
    decoded_bytes = base64_decode(encoded_bytes)
    print(f"Decoded bytes: {decoded_bytes}")
    
    # Handle API Gateway events with Base64 encoded bodies
    api_event_body = event.get("body", "")
    is_base64_encoded = event.get("isBase64Encoded", False)
    
    if is_base64_encoded and api_event_body:
        try:
            # Decode Base64 body
            decoded_body_bytes = base64_decode(api_event_body, validate=True)
            decoded_body = decoded_body_bytes.decode('utf-8')
            
            # Parse as JSON if possible
            try:
                body_data = json.loads(decoded_body)
                print(f"Decoded API body: {body_data}")
            except json.JSONDecodeError:
                print(f"Decoded API body (text): {decoded_body}")
                
        except ValueError as e:
            print(f"Failed to decode Base64 body: {e}")
            return {
                "statusCode": 400,
                "body": json.dumps({"error": "Invalid Base64 encoding"})
            }
    
    # Prepare response with Base64 encoded data
    response_data = {
        "original_data": sample_data,
        "encoded_formats": {
            "json_base64": encoded_json,
            "string_base64": encoded_string,
            "bytes_base64": encoded_bytes,
            "url_safe_base64": url_safe_encoded
        },
        "decoded_verification": {
            "json_matches": decoded_json == sample_data,
            "string_matches": decoded_string == message,
            "bytes_matches": decoded_bytes == binary_data
        }
    }
    
    return {
        "statusCode": 200,
        "body": json.dumps(response_data, indent=2)
    }

Types

from typing import Dict, Any, List, Union, Optional, Iterator, Callable, Type
import boto3

# Data masking types
class BaseProvider:
    """Base provider interface for data masking operations"""
    pass

MaskingProvider = BaseProvider
FieldPath = str  # JMESPath-style field path
EncryptionOptions = Dict[str, Any]

# Streaming types
class S3Object:
    """S3 object streaming interface"""
    pass

class BaseTransform:
    """Base transformation interface"""
    pass

StreamChunk = bytes
StreamLine = str
ChunkIterator = Iterator[bytes]
LineIterator = Iterator[str]

# Transformation options
CompressionLevel = int  # 0-9 for compression algorithms
CsvOptions = Dict[str, Any]
TransformOptions = Dict[str, Any]

# Serialization types
EncodableData = Union[str, bytes]
JsonData = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
Base64String = str
EncodingType = str  # e.g., "utf-8", "ascii"

# Validation types
JsonSchema = Dict[str, Any]
ValidationResult = Dict[str, Any]
JMESPathExpression = str

class ValidationError(Exception):
    """Base validation error"""
    pass

class SchemaValidationError(ValidationError):
    """Schema validation specific error"""
    pass

class InvalidSchemaFormatError(ValidationError):
    """Invalid schema format error"""
    pass

class InvalidEnvelopeExpressionError(ValidationError):
    """Invalid JMESPath envelope error"""
    pass

# JMESPath types
QueryExpression = str
QueryResult = Any
QueryOptions = Dict[str, Any]
CustomFunctions = Any  # JMESPath custom function registry

# Kafka types
class ConsumerRecords:
    """Kafka consumer records container"""
    pass

class BaseDeserializer:
    """Kafka message deserializer interface"""
    pass

class SchemaConfig:
    """Kafka schema configuration"""
    pass

KafkaRecord = Dict[str, Any]
DeserializedMessage = Any
SchemaRegistryUrl = str

# Lambda context type
from aws_lambda_powertools.utilities.typing import LambdaContext

# Middleware types
MiddlewareDecorator = Callable[[Callable], Callable]
HandlerFunction = Callable[[Dict[str, Any], LambdaContext], Dict[str, Any]]

# Boto3 session type
Boto3Session = boto3.Session
Boto3Config = Dict[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-aws-lambda-powertools

docs

batch-processing.md

core-observability.md

data-classes.md

event-handlers.md

feature-flags.md

index.md

parameters.md

parser.md

utilities.md

tile.json