Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python
89
Additional utilities including data masking for sensitive information, streaming for large S3 objects, serialization helpers, validation, JMESPath operations, Kafka consumer helpers, and middleware factory for Lambda decorators.
Utilities for masking sensitive data in logs, responses, and storage with pluggable providers.
class DataMasking:
def __init__(self, provider: BaseProvider):
"""
Initialize data masking utility.
Parameters:
- provider: Data masking provider (e.g., KMS-based masking)
"""
def erase(
self,
data: Any,
fields: List[str] = None,
) -> Any:
"""
Erase sensitive fields from data structure.
Parameters:
- data: Data structure (dict, list, or primitive)
- fields: List of field paths to erase (uses JMESPath syntax)
Returns:
Data with specified fields erased/masked
"""
def encrypt(
self,
data: Any,
fields: List[str] = None,
**provider_options,
) -> Any:
"""
Encrypt sensitive fields in data structure.
Parameters:
- data: Data structure to process
- fields: Field paths to encrypt
- **provider_options: Provider-specific options
Returns:
Data with specified fields encrypted
"""
def decrypt(
self,
data: Any,
fields: List[str] = None,
**provider_options,
) -> Any:
"""
Decrypt encrypted fields in data structure.
Parameters:
- data: Data structure with encrypted fields
- fields: Field paths to decrypt
- **provider_options: Provider-specific options
Returns:
Data with specified fields decrypted
"""
class BaseProvider:
"""Base data masking provider interface"""
def encrypt(self, data: str, **kwargs) -> str:
"""Encrypt string data"""
raise NotImplementedError
def decrypt(self, data: str, **kwargs) -> str:
"""Decrypt string data"""
raise NotImplementedError
def erase(self, data: str, **kwargs) -> str:
"""Erase/mask string data"""
raise NotImplementedErrorUtilities for streaming large objects from S3 with transformation support.
class S3Object:
def __init__(
self,
bucket: str,
key: str,
version_id: str = None,
boto3_session: boto3.Session = None,
**kwargs,
):
"""
Initialize S3 object for streaming operations.
Parameters:
- bucket: S3 bucket name
- key: S3 object key
- version_id: Specific object version ID
- boto3_session: Boto3 session for authentication
- **kwargs: Additional S3 client parameters
"""
def transform(self, transform: BaseTransform) -> "S3Object":
"""
Apply transformation to object during streaming.
Parameters:
- transform: Transformation to apply
Returns:
New S3Object instance with transformation applied
"""
def iter_lines(
self,
chunk_size: int = 1024,
**kwargs,
) -> Iterator[str]:
"""
Iterate over object lines.
Parameters:
- chunk_size: Size of chunks to read
- **kwargs: Additional parameters
Returns:
Iterator yielding lines from the object
"""
def iter_chunks(
self,
chunk_size: int = 1024,
**kwargs,
) -> Iterator[bytes]:
"""
Iterate over object chunks.
Parameters:
- chunk_size: Size of chunks to read
- **kwargs: Additional parameters
Returns:
Iterator yielding byte chunks from the object
"""
def read(self, size: int = -1) -> bytes:
"""
Read object data.
Parameters:
- size: Number of bytes to read (-1 for all)
Returns:
Object data as bytes
"""
def readline(self, size: int = -1) -> str:
"""
Read single line from object.
Parameters:
- size: Maximum line length
Returns:
Single line as string
"""
def write_to(
self,
destination_bucket: str,
destination_key: str,
**kwargs,
) -> Dict[str, Any]:
"""
Write transformed object to destination S3 location.
Parameters:
- destination_bucket: Destination S3 bucket
- destination_key: Destination S3 key
- **kwargs: Additional S3 put parameters
Returns:
S3 put operation response
"""
class BaseTransform:
"""Base transformation interface for streaming objects"""
def transform(self, data: bytes) -> bytes:
"""
Transform byte data.
Parameters:
- data: Input data bytes
Returns:
Transformed data bytes
"""
class GzipTransform(BaseTransform):
def __init__(self, compress: bool = True):
"""
Gzip compression/decompression transform.
Parameters:
- compress: True to compress, False to decompress
"""
def transform(self, data: bytes) -> bytes:
"""Apply gzip compression or decompression"""
class ZipTransform(BaseTransform):
def __init__(
self,
compress: bool = True,
compression_level: int = 6,
):
"""
ZIP compression/decompression transform.
Parameters:
- compress: True to compress, False to decompress
- compression_level: Compression level (0-9)
"""
def transform(self, data: bytes) -> bytes:
"""Apply ZIP compression or decompression"""
class CsvTransform(BaseTransform):
def __init__(
self,
delimiter: str = ",",
quotechar: str = '"',
headers: List[str] = None,
**kwargs,
):
"""
CSV parsing and generation transform.
Parameters:
- delimiter: Field delimiter
- quotechar: Quote character
- headers: Column headers for output
- **kwargs: Additional CSV parameters
"""
def transform(self, data: bytes) -> bytes:
"""Transform between CSV and JSON formats"""Utilities for common serialization tasks including Base64 encoding/decoding.
def base64_encode(data: Union[str, bytes], url_safe: bool = False) -> str:
"""
Encode data as Base64 string.
Parameters:
- data: Data to encode (string or bytes)
- url_safe: Whether to use URL-safe Base64 encoding
Returns:
Base64 encoded string
"""
def base64_decode(
data: str,
url_safe: bool = False,
validate: bool = True,
) -> bytes:
"""
Decode Base64 string to bytes.
Parameters:
- data: Base64 encoded string
- url_safe: Whether string uses URL-safe Base64 encoding
- validate: Whether to validate Base64 format
Returns:
Decoded bytes
Raises:
ValueError: If Base64 string is invalid and validate=True
"""
def base64_from_str(
data: str,
encoding: str = "utf-8",
url_safe: bool = False,
) -> str:
"""
Encode string as Base64.
Parameters:
- data: String to encode
- encoding: String encoding to use
- url_safe: Whether to use URL-safe Base64
Returns:
Base64 encoded string
"""
def base64_from_json(
data: Any,
ensure_ascii: bool = True,
url_safe: bool = False,
) -> str:
"""
Encode JSON data as Base64 string.
Parameters:
- data: Data to serialize as JSON then encode
- ensure_ascii: Whether to ensure ASCII-only JSON
- url_safe: Whether to use URL-safe Base64
Returns:
Base64 encoded JSON string
"""Schema validation utilities for JSON data validation.
def validate(
event: Dict[str, Any],
schema: Dict[str, Any],
envelope: str = None,
) -> Dict[str, Any]:
"""
Validate event data against JSON schema.
Parameters:
- event: Event data to validate
- schema: JSON schema for validation
- envelope: JMESPath expression to extract data from event
Returns:
Validated event data
Raises:
SchemaValidationError: If validation fails
InvalidSchemaFormatError: If schema format is invalid
InvalidEnvelopeExpressionError: If envelope expression is invalid
"""
def validator(
schema: Dict[str, Any],
envelope: str = None,
) -> Callable:
"""
Decorator for validating Lambda event data.
Parameters:
- schema: JSON schema for validation
- envelope: JMESPath expression for data extraction
Returns:
Decorator function that validates event before handler execution
"""
class InvalidSchemaFormatError(Exception):
"""Raised when JSON schema format is invalid"""
pass
class SchemaValidationError(Exception):
"""Raised when data validation against schema fails"""
pass
class InvalidEnvelopeExpressionError(Exception):
"""Raised when JMESPath envelope expression is invalid"""
passUtilities for JMESPath operations on JSON data with custom functions.
def query(
data: Dict[str, Any],
expression: str,
options: Dict[str, Any] = None,
) -> Any:
"""
Execute JMESPath query on data.
Parameters:
- data: JSON data to query
- expression: JMESPath expression
- options: JMESPath options including custom functions
Returns:
Query result
"""
def extract_data_from_envelope(
data: Dict[str, Any],
envelope: str,
) -> Any:
"""
Extract data using JMESPath envelope expression.
Parameters:
- data: Source data
- envelope: JMESPath expression for data extraction
Returns:
Extracted data
Note: This function is deprecated, use query() instead
"""
class PowertoolsFunctions:
"""Built-in JMESPath functions for common operations"""
@staticmethod
def powertools_json(value: str) -> Any:
"""Parse JSON string"""
@staticmethod
def powertools_base64(value: str) -> str:
"""Decode Base64 string"""
@staticmethod
def powertools_base64_gzip(value: str) -> str:
"""Decode Base64 and decompress gzip"""Utilities for processing Kafka messages with schema support and deserialization.
def kafka_consumer(
record_handler: Callable[[Dict], Any],
deserializer: BaseDeserializer = None,
) -> Callable:
"""
Decorator for Kafka consumer Lambda functions.
Parameters:
- record_handler: Function to process individual Kafka records
- deserializer: Deserializer for Kafka message values
Returns:
Decorated Lambda function that processes Kafka events
"""
class ConsumerRecords:
"""Kafka consumer records container"""
def __init__(
self,
raw_event: Dict[str, Any],
deserializer: BaseDeserializer = None,
):
"""
Initialize consumer records.
Parameters:
- raw_event: Raw Kafka Lambda event
- deserializer: Message deserializer
"""
@property
def records(self) -> List[KafkaRecord]:
"""Get list of Kafka records"""
def __iter__(self) -> Iterator[KafkaRecord]:
"""Iterate over Kafka records"""
class SchemaConfig:
def __init__(
self,
schema_registry_url: str,
schema_name: str = None,
schema_version: int = None,
**kwargs,
):
"""
Schema configuration for Kafka message deserialization.
Parameters:
- schema_registry_url: Confluent Schema Registry URL
- schema_name: Schema name/subject
- schema_version: Specific schema version
- **kwargs: Additional schema registry client options
"""
class BaseDeserializer:
"""Base deserializer interface for Kafka messages"""
def deserialize(self, data: bytes, **kwargs) -> Any:
"""
Deserialize Kafka message data.
Parameters:
- data: Raw message bytes
- **kwargs: Additional deserialization options
Returns:
Deserialized message data
"""Type definition for Lambda execution context.
class LambdaContext:
"""AWS Lambda execution context"""
function_name: str
function_version: str
invoked_function_arn: str
memory_limit_in_mb: int
remaining_time_in_millis: int
request_id: str
log_group_name: str
log_stream_name: str
@property
def identity(self) -> Any:
"""Cognito identity information (mobile apps)"""
@property
def client_context(self) -> Any:
"""Client context information (mobile apps)"""
def get_remaining_time_in_millis(self) -> int:
"""Get remaining execution time in milliseconds"""Factory for creating Lambda handler middleware decorators.
def lambda_handler_decorator(
trace_execution: bool = False,
clear_state: bool = False,
) -> Callable:
"""
Factory for creating Lambda handler decorators.
Parameters:
- trace_execution: Whether to trace decorator execution
- clear_state: Whether to clear state after execution
Returns:
Decorator factory function
"""from aws_lambda_powertools.utilities.data_masking import DataMasking, BaseProvider
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
class SimpleErasureProvider(BaseProvider):
"""Simple provider that erases sensitive data"""
def erase(self, data: str, **kwargs) -> str:
return "***MASKED***"
def encrypt(self, data: str, **kwargs) -> str:
# In real implementation, use proper encryption
return f"ENCRYPTED:{data[:3]}***"
def decrypt(self, data: str, **kwargs) -> str:
# In real implementation, use proper decryption
if data.startswith("ENCRYPTED:"):
return data.replace("ENCRYPTED:", "").replace("***", "")
return data
# Initialize data masking
masking = DataMasking(provider=SimpleErasureProvider())
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Sample user data with sensitive information
user_data = {
"user_id": "12345",
"name": "John Doe",
"email": "john@example.com",
"ssn": "123-45-6789",
"credit_card": "4111-1111-1111-1111",
"address": {
"street": "123 Main St",
"city": "Anytown",
"zip": "12345"
},
"preferences": {
"newsletter": True,
"phone": "555-123-4567"
}
}
# Erase sensitive fields for logging
safe_logging_data = masking.erase(
data=user_data,
fields=[
"ssn",
"credit_card",
"preferences.phone"
]
)
print(f"Processing user data: {json.dumps(safe_logging_data, indent=2)}")
# Encrypt sensitive data for storage
encrypted_data = masking.encrypt(
data=user_data,
fields=["ssn", "credit_card"]
)
# Store encrypted data
store_user_data(encrypted_data)
# Return response with masked sensitive data
response_data = masking.erase(
data=user_data,
fields=["ssn", "credit_card", "preferences.phone"]
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "User data processed",
"user": response_data
})
}
def store_user_data(data: dict):
"""Store user data (mock implementation)"""
print("Storing encrypted user data to database")from aws_lambda_powertools.utilities.streaming import S3Object
from aws_lambda_powertools.utilities.streaming.transformations import (
GzipTransform,
CsvTransform
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
def lambda_handler(event: dict, context: LambdaContext) -> dict:
# Get S3 event information
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
print(f"Processing S3 object: s3://{bucket}/{key}")
# Create S3 streaming object
s3_object = S3Object(bucket=bucket, key=key)
processed_records = 0
if key.endswith('.gz'):
# Handle compressed file
processed_records = process_compressed_file(s3_object, bucket)
elif key.endswith('.csv'):
# Handle CSV file
processed_records = process_csv_file(s3_object, bucket)
else:
# Handle regular text file
processed_records = process_text_file(s3_object, bucket)
return {
"statusCode": 200,
"body": json.dumps({
"processed_records": processed_records,
"source_bucket": bucket,
"source_key": key
})
}
def process_compressed_file(s3_object: S3Object, bucket: str) -> int:
"""Process gzip compressed file"""
# Add decompression transform
decompressed_object = s3_object.transform(GzipTransform(compress=False))
record_count = 0
# Process line by line without loading entire file into memory
for line in decompressed_object.iter_lines():
if line.strip(): # Skip empty lines
# Process individual line
process_log_line(line)
record_count += 1
print(f"Processed {record_count} log entries from compressed file")
return record_count
def process_csv_file(s3_object: S3Object, bucket: str) -> int:
"""Process CSV file and convert to JSON"""
# Transform CSV to JSON format
csv_transform = CsvTransform(
delimiter=",",
headers=["id", "name", "email", "created_at"]
)
json_object = s3_object.transform(csv_transform)
# Write transformed data to new S3 location
output_key = s3_object.key.replace('.csv', '.json')
json_object.write_to(
destination_bucket=bucket,
destination_key=f"processed/{output_key}",
ContentType="application/json"
)
# Count records by iterating through original CSV
record_count = sum(1 for line in s3_object.iter_lines()) - 1 # Subtract header
print(f"Converted {record_count} CSV records to JSON")
return record_count
def process_text_file(s3_object: S3Object, bucket: str) -> int:
"""Process regular text file"""
record_count = 0
processed_lines = []
# Process in chunks to handle large files
for chunk in s3_object.iter_chunks(chunk_size=8192):
# Process chunk data
chunk_str = chunk.decode('utf-8', errors='ignore')
lines = chunk_str.split('\n')
for line in lines:
if line.strip():
processed_line = process_text_line(line)
if processed_line:
processed_lines.append(processed_line)
record_count += 1
# Write processed data back to S3
if processed_lines:
output_data = '\n'.join(processed_lines)
# Use gzip compression for output
compressed_object = S3Object(
bucket=bucket,
key="temp/processing_output.txt"
).transform(GzipTransform(compress=True))
# This would require implementing write capability
# compressed_object.write(output_data.encode('utf-8'))
print(f"Processed {record_count} text lines")
return record_count
def process_log_line(line: str) -> None:
"""Process individual log line"""
try:
# Parse log line (e.g., JSON logs)
log_entry = json.loads(line)
# Extract relevant information
timestamp = log_entry.get("timestamp")
level = log_entry.get("level")
message = log_entry.get("message")
# Process based on log level
if level == "ERROR":
handle_error_log(log_entry)
elif level == "WARN":
handle_warning_log(log_entry)
except json.JSONDecodeError:
# Handle non-JSON log lines
print(f"Non-JSON log line: {line[:100]}...")
def process_text_line(line: str) -> str:
"""Process and transform text line"""
# Example: uppercase and add timestamp
import datetime
timestamp = datetime.datetime.utcnow().isoformat()
return f"[{timestamp}] {line.upper()}"from aws_lambda_powertools.utilities.validation import (
validate,
validator,
SchemaValidationError
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
# JSON Schema for user registration
USER_REGISTRATION_SCHEMA = {
"type": "object",
"properties": {
"firstName": {
"type": "string",
"minLength": 1,
"maxLength": 50
},
"lastName": {
"type": "string",
"minLength": 1,
"maxLength": 50
},
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "integer",
"minimum": 13,
"maximum": 120
},
"preferences": {
"type": "object",
"properties": {
"newsletter": {"type": "boolean"},
"notifications": {"type": "boolean"}
},
"additionalProperties": False
}
},
"required": ["firstName", "lastName", "email", "age"],
"additionalProperties": False
}
# Order schema
ORDER_SCHEMA = {
"type": "object",
"properties": {
"orderId": {"type": "string"},
"customerId": {"type": "string"},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"productId": {"type": "string"},
"quantity": {"type": "integer", "minimum": 1},
"price": {"type": "number", "minimum": 0}
},
"required": ["productId", "quantity", "price"]
},
"minItems": 1
},
"totalAmount": {"type": "number", "minimum": 0}
},
"required": ["orderId", "customerId", "items", "totalAmount"]
}
@validator(schema=USER_REGISTRATION_SCHEMA, envelope="body")
def register_user_handler(event: dict, context: LambdaContext) -> dict:
"""Handler with automatic validation"""
# Event body is automatically validated against schema
user_data = event["body"]
print(f"Registering user: {user_data['firstName']} {user_data['lastName']}")
# Process validated user data
user_id = create_user_account(user_data)
return {
"statusCode": 201,
"body": json.dumps({
"userId": user_id,
"message": "User registered successfully"
})
}
def manual_validation_handler(event: dict, context: LambdaContext) -> dict:
"""Handler with manual validation"""
try:
# Manually validate event data
validated_data = validate(
event=event,
schema=ORDER_SCHEMA,
envelope="body" # Extract from event.body
)
order_data = validated_data["body"]
print(f"Processing order: {order_data['orderId']}")
# Validate business rules after schema validation
validate_business_rules(order_data)
# Process order
result = process_order(order_data)
return {
"statusCode": 200,
"body": json.dumps(result)
}
except SchemaValidationError as e:
print(f"Schema validation failed: {e}")
return {
"statusCode": 400,
"body": json.dumps({
"error": "Invalid request data",
"details": str(e)
})
}
except BusinessRuleError as e:
print(f"Business rule validation failed: {e}")
return {
"statusCode": 422,
"body": json.dumps({
"error": "Business rule violation",
"details": str(e)
})
}
def validate_business_rules(order_data: dict):
"""Additional business rule validation"""
# Validate total amount matches item prices
calculated_total = sum(
item["quantity"] * item["price"]
for item in order_data["items"]
)
if abs(order_data["totalAmount"] - calculated_total) > 0.01:
raise BusinessRuleError("Total amount doesn't match item prices")
# Validate order limits
if order_data["totalAmount"] > 10000:
raise BusinessRuleError("Order exceeds maximum amount limit")
# Validate item availability
for item in order_data["items"]:
if not is_product_available(item["productId"], item["quantity"]):
raise BusinessRuleError(f"Product {item['productId']} not available")
class BusinessRuleError(Exception):
"""Custom exception for business rule violations"""
pass
def create_user_account(user_data: dict) -> str:
"""Create user account with validated data"""
import uuid
user_id = str(uuid.uuid4())
# Save to database
print(f"Creating user account: {user_id}")
return user_id
def process_order(order_data: dict) -> dict:
"""Process validated order"""
# Reserve inventory
for item in order_data["items"]:
reserve_inventory(item["productId"], item["quantity"])
# Process payment
payment_id = process_payment(order_data["totalAmount"])
return {
"orderId": order_data["orderId"],
"paymentId": payment_id,
"status": "confirmed"
}from aws_lambda_powertools.utilities.jmespath_utils import query, PowertoolsFunctions
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
def lambda_handler(event: dict, context: LambdaContext) -> dict:
"""Demonstrate JMESPath operations"""
# Complex nested event data
event_data = {
"Records": [
{
"eventSource": "aws:s3",
"s3": {
"bucket": {"name": "my-bucket"},
"object": {"key": "data/file1.json", "size": 1024}
},
"eventName": "ObjectCreated:Put"
},
{
"eventSource": "aws:s3",
"s3": {
"bucket": {"name": "my-bucket"},
"object": {"key": "data/file2.json", "size": 2048}
},
"eventName": "ObjectCreated:Put"
}
],
"responsePayload": json.dumps({
"users": [
{"id": 1, "name": "Alice", "active": True},
{"id": 2, "name": "Bob", "active": False}
]
})
}
# Extract S3 bucket names
bucket_names = query(
data=event_data,
expression="Records[*].s3.bucket.name"
)
print(f"Bucket names: {bucket_names}")
# Extract object keys for created objects only
created_objects = query(
data=event_data,
expression="Records[?eventName == 'ObjectCreated:Put'].s3.object.key"
)
print(f"Created objects: {created_objects}")
# Calculate total size of all objects
total_size = query(
data=event_data,
expression="sum(Records[*].s3.object.size)"
)
print(f"Total size: {total_size} bytes")
# Extract and parse JSON payload using custom functions
options = {"custom_functions": PowertoolsFunctions()}
users_data = query(
data=event_data,
expression="powertools_json(responsePayload).users",
options=options
)
print(f"Users data: {users_data}")
# Get active users only
active_users = query(
data=event_data,
expression="powertools_json(responsePayload).users[?active == `true`].name",
options=options
)
print(f"Active users: {active_users}")
# Complex filtering and transformation
processed_records = query(
data=event_data,
expression="""Records[?eventSource == 'aws:s3'] | [].{
bucket: s3.bucket.name,
key: s3.object.key,
size_mb: s3.object.size / `1024` / `1024`,
is_large: s3.object.size > `1500`
}"""
)
print(f"Processed records: {json.dumps(processed_records, indent=2)}")
return {
"statusCode": 200,
"body": json.dumps({
"bucket_names": bucket_names,
"created_objects": created_objects,
"total_size": total_size,
"active_users": active_users,
"processed_records": processed_records
})
}from aws_lambda_powertools.utilities.serialization import (
base64_encode,
base64_decode,
base64_from_str,
base64_from_json
)
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
def lambda_handler(event: dict, context: LambdaContext) -> dict:
"""Demonstrate serialization utilities"""
# Sample data to work with
sample_data = {
"user_id": "12345",
"action": "purchase",
"metadata": {
"product_id": "prod-789",
"amount": 99.99,
"currency": "USD"
}
}
# Encode JSON as Base64
encoded_json = base64_from_json(sample_data)
print(f"Encoded JSON: {encoded_json}")
# Encode string as Base64
message = "Hello, World!"
encoded_string = base64_from_str(message)
print(f"Encoded string: {encoded_string}")
# Encode bytes as Base64
binary_data = b"Binary data content"
encoded_bytes = base64_encode(binary_data)
print(f"Encoded bytes: {encoded_bytes}")
# URL-safe Base64 encoding
url_safe_encoded = base64_encode(binary_data, url_safe=True)
print(f"URL-safe encoded: {url_safe_encoded}")
# Decode Base64 back to original data
decoded_json_bytes = base64_decode(encoded_json)
decoded_json = json.loads(decoded_json_bytes.decode('utf-8'))
print(f"Decoded JSON: {decoded_json}")
decoded_string_bytes = base64_decode(encoded_string)
decoded_string = decoded_string_bytes.decode('utf-8')
print(f"Decoded string: {decoded_string}")
decoded_bytes = base64_decode(encoded_bytes)
print(f"Decoded bytes: {decoded_bytes}")
# Handle API Gateway events with Base64 encoded bodies
api_event_body = event.get("body", "")
is_base64_encoded = event.get("isBase64Encoded", False)
if is_base64_encoded and api_event_body:
try:
# Decode Base64 body
decoded_body_bytes = base64_decode(api_event_body, validate=True)
decoded_body = decoded_body_bytes.decode('utf-8')
# Parse as JSON if possible
try:
body_data = json.loads(decoded_body)
print(f"Decoded API body: {body_data}")
except json.JSONDecodeError:
print(f"Decoded API body (text): {decoded_body}")
except ValueError as e:
print(f"Failed to decode Base64 body: {e}")
return {
"statusCode": 400,
"body": json.dumps({"error": "Invalid Base64 encoding"})
}
# Prepare response with Base64 encoded data
response_data = {
"original_data": sample_data,
"encoded_formats": {
"json_base64": encoded_json,
"string_base64": encoded_string,
"bytes_base64": encoded_bytes,
"url_safe_base64": url_safe_encoded
},
"decoded_verification": {
"json_matches": decoded_json == sample_data,
"string_matches": decoded_string == message,
"bytes_matches": decoded_bytes == binary_data
}
}
return {
"statusCode": 200,
"body": json.dumps(response_data, indent=2)
}from typing import Dict, Any, List, Union, Optional, Iterator, Callable, Type
import boto3
# Data masking types
class BaseProvider:
"""Base provider interface for data masking operations"""
pass
MaskingProvider = BaseProvider
FieldPath = str # JMESPath-style field path
EncryptionOptions = Dict[str, Any]
# Streaming types
class S3Object:
"""S3 object streaming interface"""
pass
class BaseTransform:
"""Base transformation interface"""
pass
StreamChunk = bytes
StreamLine = str
ChunkIterator = Iterator[bytes]
LineIterator = Iterator[str]
# Transformation options
CompressionLevel = int # 0-9 for compression algorithms
CsvOptions = Dict[str, Any]
TransformOptions = Dict[str, Any]
# Serialization types
EncodableData = Union[str, bytes]
JsonData = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
Base64String = str
EncodingType = str # e.g., "utf-8", "ascii"
# Validation types
JsonSchema = Dict[str, Any]
ValidationResult = Dict[str, Any]
JMESPathExpression = str
class ValidationError(Exception):
"""Base validation error"""
pass
class SchemaValidationError(ValidationError):
"""Schema validation specific error"""
pass
class InvalidSchemaFormatError(ValidationError):
"""Invalid schema format error"""
pass
class InvalidEnvelopeExpressionError(ValidationError):
"""Invalid JMESPath envelope error"""
pass
# JMESPath types
QueryExpression = str
QueryResult = Any
QueryOptions = Dict[str, Any]
CustomFunctions = Any # JMESPath custom function registry
# Kafka types
class ConsumerRecords:
"""Kafka consumer records container"""
pass
class BaseDeserializer:
"""Kafka message deserializer interface"""
pass
class SchemaConfig:
"""Kafka schema configuration"""
pass
KafkaRecord = Dict[str, Any]
DeserializedMessage = Any
SchemaRegistryUrl = str
# Lambda context type
from aws_lambda_powertools.utilities.typing import LambdaContext
# Middleware types
MiddlewareDecorator = Callable[[Callable], Callable]
HandlerFunction = Callable[[Dict[str, Any], LambdaContext], Dict[str, Any]]
# Boto3 session type
Boto3Session = boto3.Session
Boto3Config = Dict[str, Any]Install with Tessl CLI
npx tessl i tessl/pypi-aws-lambda-powertoolsdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10