Comprehensive developer toolkit implementing serverless best practices for AWS Lambda functions in Python
89
Type-safe data classes representing AWS event structures for Lambda triggers including API Gateway, S3, DynamoDB, Kinesis, SQS, SNS, CloudWatch events, and many other AWS services. These classes provide convenient property access and automatic parsing of complex event structures.
Decorator for automatically parsing Lambda events into data class instances.
def event_source(data_class: type) -> Callable:
"""
Decorator to automatically parse Lambda event into data class.
Parameters:
- data_class: Data class type to parse event into
Returns:
Decorated function that receives parsed event as first parameter
"""Data classes for API Gateway proxy integration events.
class APIGatewayProxyEvent:
"""API Gateway Lambda Proxy Integration event"""
@property
def body(self) -> str | None:
"""Raw request body as string"""
@property
def json_body(self) -> Any:
"""Parse JSON request body (caches result)"""
@property
def headers(self) -> Dict[str, str]:
"""Request headers (case-insensitive access)"""
@property
def query_string_parameters(self) -> Dict[str, str] | None:
"""Query string parameters"""
@property
def multi_value_query_string_parameters(self) -> Dict[str, List[str]] | None:
"""Multi-value query string parameters"""
@property
def path_parameters(self) -> Dict[str, str] | None:
"""Path parameters from route"""
@property
def stage_variables(self) -> Dict[str, str] | None:
"""API Gateway stage variables"""
@property
def request_context(self) -> Dict[str, Any]:
"""API Gateway request context"""
@property
def is_base64_encoded(self) -> bool:
"""Whether body is base64 encoded"""
def get_query_string_value(
self,
name: str,
default_value: str = None
) -> str | None:
"""
Get query string parameter value.
Parameters:
- name: Parameter name
- default_value: Default if not found
Returns:
Parameter value or default
"""
def get_header_value(
self,
name: str,
default_value: str = None,
case_sensitive: bool = False
) -> str | None:
"""
Get header value with optional case sensitivity.
Parameters:
- name: Header name
- default_value: Default if not found
- case_sensitive: Whether to match case exactly
Returns:
Header value or default
"""
class APIGatewayProxyEventV2:
"""API Gateway HTTP API (v2.0) event"""
@property
def body(self) -> str | None:
"""Raw request body"""
@property
def json_body(self) -> Any:
"""Parse JSON request body"""
@property
def headers(self) -> Dict[str, str]:
"""Request headers"""
@property
def query_string_parameters(self) -> Dict[str, str] | None:
"""Query string parameters"""
@property
def path_parameters(self) -> Dict[str, str] | None:
"""Path parameters"""
@property
def stage_variables(self) -> Dict[str, str] | None:
"""Stage variables"""
@property
def request_context(self) -> Dict[str, Any]:
"""HTTP API request context"""
@property
def cookies(self) -> List[str] | None:
"""Request cookies"""
class APIGatewayWebSocketEvent:
"""API Gateway WebSocket event"""
@property
def body(self) -> str | None:
"""WebSocket message body"""
@property
def request_context(self) -> Dict[str, Any]:
"""WebSocket request context"""
@property
def connection_id(self) -> str:
"""WebSocket connection ID"""
@property
def route_key(self) -> str:
"""WebSocket route key"""Data class for ALB target group Lambda integration.
class ALBEvent:
"""Application Load Balancer event"""
@property
def body(self) -> str | None:
"""Request body"""
@property
def json_body(self) -> Any:
"""Parse JSON request body"""
@property
def headers(self) -> Dict[str, str]:
"""Request headers"""
@property
def query_string_parameters(self) -> Dict[str, str] | None:
"""Query parameters"""
@property
def multi_value_headers(self) -> Dict[str, List[str]] | None:
"""Multi-value headers"""
@property
def multi_value_query_string_parameters(self) -> Dict[str, List[str]] | None:
"""Multi-value query parameters"""
@property
def request_context(self) -> Dict[str, Any]:
"""ALB request context"""
@property
def is_base64_encoded(self) -> bool:
"""Whether body is base64 encoded"""Data class for Lambda Function URLs.
class LambdaFunctionUrlEvent:
"""Lambda Function URL event"""
@property
def body(self) -> str | None:
"""Request body"""
@property
def json_body(self) -> Any:
"""Parse JSON request body"""
@property
def headers(self) -> Dict[str, str]:
"""Request headers"""
@property
def query_string_parameters(self) -> Dict[str, str] | None:
"""Query parameters"""
@property
def request_context(self) -> Dict[str, Any]:
"""Function URL request context"""
@property
def cookies(self) -> List[str] | None:
"""Request cookies"""
@property
def is_base64_encoded(self) -> bool:
"""Whether body is base64 encoded"""Data classes for AWS AppSync GraphQL events.
class AppSyncResolverEvent:
"""AppSync direct Lambda resolver event"""
@property
def arguments(self) -> Dict[str, Any]:
"""GraphQL field arguments"""
@property
def identity(self) -> Dict[str, Any]:
"""AppSync identity information"""
@property
def source(self) -> Dict[str, Any]:
"""Parent object for field resolution"""
@property
def request(self) -> Dict[str, Any]:
"""AppSync request information"""
@property
def prev(self) -> Dict[str, Any]:
"""Previous resolver result in pipeline"""
@property
def info(self) -> Dict[str, Any]:
"""GraphQL query information"""
@property
def stash(self) -> Dict[str, Any]:
"""Pipeline resolver stash"""
class AppSyncResolverEventsEvent:
"""AppSync Events resolver event"""
@property
def arguments(self) -> Dict[str, Any]:
"""Event arguments"""
@property
def identity(self) -> Dict[str, Any]:
"""Identity information"""
@property
def request(self) -> Dict[str, Any]:
"""Request information"""Data classes for Amazon SQS events.
class SQSEvent:
"""SQS Lambda trigger event"""
@property
def records(self) -> List[SQSRecord]:
"""List of SQS records"""
class SQSRecord:
"""Individual SQS record"""
@property
def body(self) -> str:
"""Message body"""
@property
def json_body(self) -> Any:
"""Parse JSON message body"""
@property
def receipt_handle(self) -> str:
"""SQS receipt handle"""
@property
def message_id(self) -> str:
"""SQS message ID"""
@property
def md5_of_body(self) -> str:
"""MD5 hash of message body"""
@property
def event_source(self) -> str:
"""Event source (aws:sqs)"""
@property
def event_source_arn(self) -> str:
"""SQS queue ARN"""
@property
def aws_region(self) -> str:
"""AWS region"""
@property
def attributes(self) -> Dict[str, Any]:
"""SQS message attributes"""
@property
def message_attributes(self) -> Dict[str, Any]:
"""SQS message attributes"""Data classes for Amazon SNS events.
class SNSEvent:
"""SNS Lambda trigger event"""
@property
def records(self) -> List[SNSRecord]:
"""List of SNS records"""
class SNSRecord:
"""Individual SNS record"""
@property
def sns(self) -> Dict[str, Any]:
"""SNS message data"""
@property
def subject(self) -> str | None:
"""SNS message subject"""
@property
def message(self) -> str:
"""SNS message body"""
@property
def json_message(self) -> Any:
"""Parse JSON SNS message"""
@property
def message_id(self) -> str:
"""SNS message ID"""
@property
def topic_arn(self) -> str:
"""SNS topic ARN"""
@property
def timestamp(self) -> str:
"""Message timestamp"""
@property
def message_attributes(self) -> Dict[str, Any]:
"""SNS message attributes"""Data classes for Amazon S3 events.
class S3Event:
"""S3 Lambda trigger event"""
@property
def records(self) -> List[S3Record]:
"""List of S3 records"""
class S3Record:
"""Individual S3 record"""
@property
def s3(self) -> Dict[str, Any]:
"""S3 event data"""
@property
def bucket_name(self) -> str:
"""S3 bucket name"""
@property
def object_key(self) -> str:
"""S3 object key (URL decoded)"""
@property
def object_size(self) -> int | None:
"""S3 object size in bytes"""
@property
def object_etag(self) -> str | None:
"""S3 object ETag"""
@property
def object_version_id(self) -> str | None:
"""S3 object version ID"""
@property
def event_name(self) -> str:
"""S3 event name (e.g., ObjectCreated:Put)"""
@property
def event_source(self) -> str:
"""Event source (aws:s3)"""
@property
def aws_region(self) -> str:
"""AWS region"""
@property
def user_identity(self) -> Dict[str, Any]:
"""User identity information"""
class S3EventBridgeNotificationEvent:
"""S3 EventBridge notification event"""
@property
def detail(self) -> Dict[str, Any]:
"""Event detail"""
@property
def bucket_name(self) -> str:
"""S3 bucket name"""
@property
def object_key(self) -> str:
"""S3 object key"""
@property
def object_size(self) -> int | None:
"""Object size in bytes"""
class S3BatchOperationEvent:
"""S3 Batch Operations event"""
@property
def invocation_id(self) -> str:
"""Batch operation invocation ID"""
@property
def invocation_schema_version(self) -> str:
"""Schema version"""
@property
def tasks(self) -> List[Dict[str, Any]]:
"""List of batch operation tasks"""
class S3BatchOperationResponse:
"""S3 Batch Operations response"""
def __init__(
self,
invocation_id: str,
treat_missing_keys_as: str = "PermanentFailure",
invocation_schema_version: str = "1.0",
results: List[S3BatchOperationResponseRecord] = None,
):
"""
Create S3 Batch Operations response.
Parameters:
- invocation_id: Batch operation invocation ID
- treat_missing_keys_as: How to handle missing keys
- invocation_schema_version: Schema version
- results: List of operation results
"""
class S3BatchOperationResponseRecord:
"""Individual S3 Batch Operations result"""
def __init__(
self,
task_id: str,
result_code: str,
result_string: str = None,
):
"""
Create batch operation result record.
Parameters:
- task_id: Task identifier
- result_code: Result code (Succeeded, PermanentFailure, etc.)
- result_string: Optional result description
"""Data classes for DynamoDB Streams events.
class DynamoDBStreamEvent:
"""DynamoDB Streams Lambda trigger event"""
@property
def records(self) -> List[DynamoDBRecord]:
"""List of DynamoDB Stream records"""
class DynamoDBRecord:
"""Individual DynamoDB Stream record"""
@property
def aws_region(self) -> str:
"""AWS region"""
@property
def dynamodb(self) -> Dict[str, Any]:
"""DynamoDB stream record data"""
@property
def event_id(self) -> str:
"""Event ID"""
@property
def event_name(self) -> str:
"""Event name (INSERT, MODIFY, REMOVE)"""
@property
def event_source(self) -> str:
"""Event source (aws:dynamodb)"""
@property
def event_source_arn(self) -> str:
"""DynamoDB table stream ARN"""
@property
def event_version(self) -> str:
"""Event version"""
@property
def user_identity(self) -> Dict[str, Any] | None:
"""User identity information"""Data classes for Amazon Kinesis events.
class KinesisStreamEvent:
"""Kinesis Data Streams Lambda trigger event"""
@property
def records(self) -> List[KinesisStreamRecord]:
"""List of Kinesis records"""
class KinesisStreamRecord:
"""Individual Kinesis Stream record"""
@property
def kinesis(self) -> Dict[str, Any]:
"""Kinesis record data"""
@property
def data(self) -> str:
"""Base64 encoded record data"""
@property
def data_as_bytes(self) -> bytes:
"""Record data as bytes"""
@property
def data_as_text(self) -> str:
"""Record data as UTF-8 text"""
@property
def data_as_json(self) -> Any:
"""Parse record data as JSON"""
@property
def partition_key(self) -> str:
"""Kinesis partition key"""
@property
def sequence_number(self) -> str:
"""Kinesis sequence number"""
@property
def approximate_arrival_timestamp(self) -> int:
"""Approximate arrival timestamp"""
@property
def event_source(self) -> str:
"""Event source (aws:kinesis)"""
@property
def event_source_arn(self) -> str:
"""Kinesis stream ARN"""
@property
def aws_region(self) -> str:
"""AWS region"""
class KinesisFirehoseEvent:
"""Kinesis Data Firehose transformation event"""
@property
def invocation_id(self) -> str:
"""Firehose invocation ID"""
@property
def delivery_stream_arn(self) -> str:
"""Firehose delivery stream ARN"""
@property
def region(self) -> str:
"""AWS region"""
@property
def records(self) -> List[KinesisFirehoseDataTransformationRecord]:
"""List of records to transform"""
class KinesisFirehoseDataTransformationRecord:
"""Kinesis Firehose transformation record"""
@property
def record_id(self) -> str:
"""Record ID"""
@property
def approximate_arrival_timestamp(self) -> int:
"""Approximate arrival timestamp"""
@property
def data(self) -> str:
"""Base64 encoded record data"""
@property
def data_as_bytes(self) -> bytes:
"""Record data as bytes"""
@property
def data_as_text(self) -> str:
"""Record data as UTF-8 text"""
@property
def data_as_json(self) -> Any:
"""Parse record data as JSON"""
@property
def kms_key_id(self) -> str | None:
"""KMS key ID for encryption"""
@property
def metadata(self) -> KinesisFirehoseDataTransformationRecordMetadata | None:
"""Record metadata"""
class KinesisFirehoseDataTransformationRecordMetadata:
"""Kinesis Firehose record metadata"""
@property
def partition_keys(self) -> Dict[str, str]:
"""Partition keys for S3 delivery"""
class KinesisFirehoseDataTransformationResponse:
"""Kinesis Firehose transformation response"""
def __init__(self, records: List[Dict[str, Any]]):
"""
Create Firehose transformation response.
Parameters:
- records: List of transformed records with recordId, result, and data
"""Data classes for Amazon EventBridge events.
class EventBridgeEvent:
"""EventBridge (CloudWatch Events) event"""
@property
def version(self) -> str:
"""Event version"""
@property
def id(self) -> str:
"""Event ID"""
@property
def detail_type(self) -> str:
"""Event detail type"""
@property
def source(self) -> str:
"""Event source"""
@property
def account(self) -> str:
"""AWS account ID"""
@property
def time(self) -> str:
"""Event time (ISO 8601)"""
@property
def region(self) -> str:
"""AWS region"""
@property
def detail(self) -> Dict[str, Any]:
"""Event detail"""
@property
def resources(self) -> List[str]:
"""Event resources"""Data classes for CloudWatch-specific events.
class CloudWatchLogsEvent:
"""CloudWatch Logs subscription filter event"""
@property
def message_type(self) -> str:
"""Message type"""
@property
def owner(self) -> str:
"""AWS account ID"""
@property
def log_group(self) -> str:
"""CloudWatch log group name"""
@property
def log_stream(self) -> str:
"""CloudWatch log stream name"""
@property
def subscription_filters(self) -> List[str]:
"""Subscription filter names"""
@property
def log_events(self) -> List[Dict[str, Any]]:
"""List of log events"""
def parse_logs_data(self) -> Dict[str, Any]:
"""Parse and decompress CloudWatch Logs data"""
class CloudWatchAlarmEvent:
"""CloudWatch Alarm state change event"""
@property
def alarm_data(self) -> CloudWatchAlarmData:
"""Alarm data"""
class CloudWatchAlarmData:
"""CloudWatch Alarm data"""
@property
def alarm_name(self) -> str:
"""Alarm name"""
@property
def state(self) -> CloudWatchAlarmState:
"""Current alarm state"""
@property
def previous_state(self) -> CloudWatchAlarmState:
"""Previous alarm state"""
@property
def configuration(self) -> CloudWatchAlarmConfiguration:
"""Alarm configuration"""
class CloudWatchAlarmState:
"""CloudWatch Alarm state information"""
@property
def value(self) -> str:
"""State value (OK, ALARM, INSUFFICIENT_DATA)"""
@property
def reason(self) -> str:
"""State change reason"""
@property
def timestamp(self) -> str:
"""State timestamp"""
class CloudWatchAlarmConfiguration:
"""CloudWatch Alarm configuration"""
@property
def description(self) -> str | None:
"""Alarm description"""
@property
def metrics(self) -> List[CloudWatchAlarmMetric]:
"""Alarm metrics"""
class CloudWatchAlarmMetric:
"""CloudWatch Alarm metric"""
@property
def id(self) -> str:
"""Metric ID"""
@property
def metric_stat(self) -> CloudWatchAlarmMetricStat | None:
"""Metric statistics"""
class CloudWatchAlarmMetricStat:
"""CloudWatch Alarm metric statistics"""
@property
def metric(self) -> Dict[str, Any]:
"""Metric definition"""
@property
def period(self) -> int:
"""Metric period in seconds"""
@property
def stat(self) -> str:
"""Statistic (Average, Sum, Maximum, etc.)"""
class CloudWatchDashboardCustomWidgetEvent:
"""CloudWatch Dashboard custom widget event"""
@property
def describe(self) -> bool:
"""Whether this is a describe request"""
@property
def widget_context(self) -> Dict[str, Any]:
"""Widget context"""Data classes for AWS Config rule events.
class AWSConfigRuleEvent:
"""AWS Config rule evaluation event"""
@property
def version(self) -> str:
"""Event version"""
@property
def invocation_event(self) -> Dict[str, Any]:
"""Config rule invocation event"""
@property
def rule_parameters(self) -> Dict[str, Any]:
"""Config rule parameters"""
@property
def result_token(self) -> str:
"""Result token for compliance evaluation"""
@property
def event_left_scope(self) -> bool:
"""Whether event left scope"""
@property
def executing_rule_name(self) -> str:
"""Config rule name"""
@property
def config_rule_arn(self) -> str:
"""Config rule ARN"""
@property
def config_rule_name(self) -> str:
"""Config rule name"""
@property
def account_id(self) -> str:
"""AWS account ID"""Data classes for AWS Secrets Manager rotation events.
class SecretsManagerEvent:
"""Secrets Manager rotation event"""
@property
def secret_id(self) -> str:
"""Secret ARN or name"""
@property
def client_request_token(self) -> str:
"""Client request token"""
@property
def step(self) -> str:
"""Rotation step (createSecret, setSecret, testSecret, finishSecret)"""Data classes for Amazon Connect events.
class ConnectContactFlowEvent:
"""Amazon Connect Contact Flow event"""
@property
def details(self) -> Dict[str, Any]:
"""Contact flow details"""
@property
def contact_data(self) -> Dict[str, Any]:
"""Contact data"""
@property
def parameters(self) -> Dict[str, Any]:
"""Contact flow parameters"""Data classes for AWS CodePipeline events.
class CodePipelineJobEvent:
"""CodePipeline job event"""
@property
def job_details(self) -> Dict[str, Any]:
"""CodePipeline job details"""
@property
def job_id(self) -> str:
"""CodePipeline job ID"""
@property
def job_data(self) -> Dict[str, Any]:
"""Job data"""
@property
def input_artifacts(self) -> List[Dict[str, Any]]:
"""Input artifacts"""
@property
def output_artifacts(self) -> List[Dict[str, Any]]:
"""Output artifacts"""
@property
def action_configuration(self) -> Dict[str, Any]:
"""Action configuration"""Data classes for AWS CodeDeploy events.
class CodeDeployLifecycleHookEvent:
"""CodeDeploy lifecycle hook event"""
@property
def account_id(self) -> str:
"""AWS account ID"""
@property
def region(self) -> str:
"""AWS region"""
@property
def deployment_id(self) -> str:
"""CodeDeploy deployment ID"""
@property
def application_name(self) -> str:
"""CodeDeploy application name"""
@property
def deployment_group_name(self) -> str:
"""Deployment group name"""
@property
def lifecycle_event_hook_execution_id(self) -> str:
"""Lifecycle hook execution ID"""Data classes for Amazon VPC Lattice events.
class VPCLatticeEvent:
"""VPC Lattice service request event"""
@property
def body(self) -> str | None:
"""Request body"""
@property
def json_body(self) -> Any:
"""Parse JSON request body"""
@property
def headers(self) -> Dict[str, str]:
"""Request headers"""
@property
def query_string_parameters(self) -> Dict[str, str] | None:
"""Query parameters"""
@property
def path_parameters(self) -> Dict[str, str] | None:
"""Path parameters"""
@property
def request_context(self) -> Dict[str, Any]:
"""VPC Lattice request context"""
@property
def is_base64_encoded(self) -> bool:
"""Whether body is base64 encoded"""
class VPCLatticeEventV2:
"""VPC Lattice v2 service request event"""
@property
def body(self) -> str | None:
"""Request body"""
@property
def json_body(self) -> Any:
"""Parse JSON request body"""
@property
def headers(self) -> Dict[str, str]:
"""Request headers"""
@property
def query_string_parameters(self) -> Dict[str, str] | None:
"""Query parameters"""
@property
def path_parameters(self) -> Dict[str, str] | None:
"""Path parameters"""
@property
def request_context(self) -> Dict[str, Any]:
"""VPC Lattice v2 request context"""
@property
def is_base64_encoded(self) -> bool:
"""Whether body is base64 encoded"""Data classes for Amazon Bedrock events.
class BedrockAgentEvent:
"""Bedrock Agent invocation event"""
@property
def message_version(self) -> str:
"""Message version"""
@property
def input_text(self) -> str:
"""Agent input text"""
@property
def session_id(self) -> str:
"""Agent session ID"""
@property
def action_group(self) -> str:
"""Action group name"""
@property
def api_path(self) -> str:
"""API path"""
@property
def http_method(self) -> str:
"""HTTP method"""
@property
def parameters(self) -> Dict[str, Any]:
"""Action parameters"""
@property
def request_body(self) -> Dict[str, Any] | None:
"""Request body"""
@property
def session_attributes(self) -> Dict[str, str]:
"""Session attributes"""
@property
def prompt_session_attributes(self) -> Dict[str, str]:
"""Prompt session attributes"""
class BedrockAgentFunctionEvent:
"""Bedrock Agent function invocation event"""
@property
def message_version(self) -> str:
"""Message version"""
@property
def input_text(self) -> str:
"""Function input text"""
@property
def session_id(self) -> str:
"""Agent session ID"""
@property
def function(self) -> str:
"""Function name"""
@property
def parameters(self) -> Dict[str, Any]:
"""Function parameters"""
@property
def session_attributes(self) -> Dict[str, str]:
"""Session attributes"""
@property
def prompt_session_attributes(self) -> Dict[str, str]:
"""Prompt session attributes"""Data classes for Amazon Simple Email Service events.
class SESEvent:
"""Simple Email Service event"""
@property
def records(self) -> List[SESRecord]:
"""List of SES records"""
class SESRecord:
"""Individual SES record"""
@property
def ses(self) -> Dict[str, Any]:
"""SES event data"""
@property
def mail(self) -> Dict[str, Any]:
"""Email message data"""
@property
def receipt(self) -> Dict[str, Any]:
"""Email receipt data"""Data classes for managed Kafka events.
class KafkaEvent:
"""Managed Kafka event (MSK/Self-managed)"""
@property
def event_source(self) -> str:
"""Event source"""
@property
def event_source_arn(self) -> str:
"""Kafka cluster ARN"""
@property
def records(self) -> Dict[str, List[KafkaRecord]]:
"""Records grouped by topic-partition"""
@property
def bootstrap_servers(self) -> str:
"""Kafka bootstrap servers"""
class KafkaRecord:
"""Individual Kafka record"""
@property
def topic(self) -> str:
"""Kafka topic"""
@property
def partition(self) -> int:
"""Kafka partition"""
@property
def offset(self) -> int:
"""Kafka offset"""
@property
def timestamp(self) -> int:
"""Record timestamp"""
@property
def timestamp_type(self) -> str:
"""Timestamp type"""
@property
def key(self) -> str | None:
"""Record key (base64 encoded)"""
@property
def value(self) -> str:
"""Record value (base64 encoded)"""
@property
def headers(self) -> Dict[str, str]:
"""Record headers"""
@property
def decoded_key(self) -> bytes | None:
"""Decode record key from base64"""
@property
def decoded_value(self) -> bytes:
"""Decode record value from base64"""
@property
def json_value(self) -> Any:
"""Parse record value as JSON"""Data classes for AWS CloudFormation events.
class CloudFormationCustomResourceEvent:
"""CloudFormation custom resource event"""
@property
def request_type(self) -> str:
"""Request type (Create, Update, Delete)"""
@property
def response_url(self) -> str:
"""CloudFormation response URL"""
@property
def stack_id(self) -> str:
"""CloudFormation stack ID"""
@property
def request_id(self) -> str:
"""Request ID"""
@property
def resource_type(self) -> str:
"""Custom resource type"""
@property
def logical_resource_id(self) -> str:
"""Logical resource ID"""
@property
def resource_properties(self) -> Dict[str, Any]:
"""Resource properties"""
@property
def old_resource_properties(self) -> Dict[str, Any] | None:
"""Old resource properties (Update only)"""Data classes for AWS Transfer Family events.
class TransferFamilyAuthorizer:
"""Transfer Family custom authorizer event"""
@property
def username(self) -> str:
"""Username attempting authentication"""
@property
def password(self) -> str:
"""Password for authentication"""
@property
def protocol(self) -> str:
"""Transfer protocol (SFTP, FTPS, FTP)"""
@property
def server_id(self) -> str:
"""Transfer Family server ID"""
@property
def source_ip(self) -> str:
"""Source IP address"""
class TransferFamilyAuthorizerResponse:
"""Transfer Family authorizer response"""
def __init__(
self,
role: str = None,
policy: str = None,
home_directory: str = None,
home_directory_type: str = "PATH",
home_directory_details: List[Dict[str, str]] = None,
posix_profile: Dict[str, int] = None,
public_key_id: str = None,
):
"""
Create Transfer Family authorizer response.
Parameters:
- role: IAM role ARN for the user
- policy: Session policy JSON
- home_directory: User's home directory
- home_directory_type: Type of home directory (PATH or LOGICAL)
- home_directory_details: Logical directory mappings
- posix_profile: POSIX profile (Uid, Gid, SecondaryGids)
- public_key_id: Public key ID
"""from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context: LambdaContext) -> dict:
# Access request properties
method = event.request_context["httpMethod"]
path = event.request_context["path"]
# Get headers (case-insensitive)
content_type = event.get_header_value("content-type", "application/json")
auth_header = event.get_header_value("Authorization")
# Get query parameters
page = event.get_query_string_value("page", "1")
limit = event.get_query_string_value("limit", "10")
# Parse JSON body if present
request_data = None
if event.body:
request_data = event.json_body
# Process based on method and path
if method == "GET" and path == "/users":
users = get_users(page=int(page), limit=int(limit))
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"users": users, "page": page, "limit": limit})
}
elif method == "POST" and path == "/users":
if not request_data:
return {
"statusCode": 400,
"body": json.dumps({"error": "Request body required"})
}
user = create_user(request_data)
return {
"statusCode": 201,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(user)
}
elif method == "GET" and event.path_parameters:
user_id = event.path_parameters.get("id")
if user_id:
user = get_user(user_id)
if user:
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(user)
}
else:
return {"statusCode": 404, "body": json.dumps({"error": "User not found"})}
return {"statusCode": 404, "body": json.dumps({"error": "Route not found"})}from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
@event_source(data_class=SQSEvent)
def lambda_handler(event: SQSEvent, context: LambdaContext) -> dict:
success_count = 0
error_count = 0
# Process each SQS record
for record in event.records:
try:
# Access record properties
message_id = record.message_id
receipt_handle = record.receipt_handle
# Parse message body
message_data = record.json_body
# Check message attributes
message_type = None
if record.message_attributes:
message_type_attr = record.message_attributes.get("MessageType")
if message_type_attr:
message_type = message_type_attr.get("stringValue")
# Process based on message type
if message_type == "order":
process_order(message_data)
elif message_type == "payment":
process_payment(message_data)
else:
# Default processing
process_generic_message(message_data)
success_count += 1
except Exception as e:
error_count += 1
print(f"Failed to process message {record.message_id}: {str(e)}")
# In batch processing, you might want to raise to trigger partial batch failure
return {
"statusCode": 200,
"body": json.dumps({
"processed": success_count,
"errors": error_count
})
}
def process_order(order_data: dict):
"""Process order message"""
order_id = order_data.get("orderId")
customer_id = order_data.get("customerId")
# Order processing logic
print(f"Processing order {order_id} for customer {customer_id}")
def process_payment(payment_data: dict):
"""Process payment message"""
payment_id = payment_data.get("paymentId")
amount = payment_data.get("amount")
# Payment processing logic
print(f"Processing payment {payment_id} for amount ${amount}")from aws_lambda_powertools.utilities.data_classes import S3Event, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
import boto3
from urllib.parse import unquote_plus
s3_client = boto3.client("s3")
@event_source(data_class=S3Event)
def lambda_handler(event: S3Event, context: LambdaContext) -> dict:
processed_objects = []
for record in event.records:
# Get S3 object information
bucket_name = record.bucket_name
object_key = record.object_key # Already URL decoded
event_name = record.event_name
object_size = record.object_size
print(f"Processing {event_name} event for {bucket_name}/{object_key}")
print(f"Object size: {object_size} bytes")
# Handle different S3 events
if event_name.startswith("ObjectCreated"):
result = handle_object_created(bucket_name, object_key, record)
elif event_name.startswith("ObjectRemoved"):
result = handle_object_removed(bucket_name, object_key, record)
else:
result = {"status": "ignored", "event": event_name}
processed_objects.append({
"bucket": bucket_name,
"key": object_key,
"event": event_name,
"result": result
})
return {
"statusCode": 200,
"processedObjects": processed_objects
}
def handle_object_created(bucket: str, key: str, record) -> dict:
"""Handle S3 object creation"""
# Check file extension
if key.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
# Process image file
return process_image_file(bucket, key)
elif key.lower().endswith('.json'):
# Process JSON data file
return process_json_file(bucket, key)
elif key.lower().endswith('.csv'):
# Process CSV file
return process_csv_file(bucket, key)
else:
return {"status": "skipped", "reason": "unsupported_file_type"}
def handle_object_removed(bucket: str, key: str, record) -> dict:
"""Handle S3 object removal"""
# Cleanup related resources
cleanup_related_data(bucket, key)
return {"status": "cleaned_up"}
def process_image_file(bucket: str, key: str) -> dict:
"""Process uploaded image file"""
try:
# Example: Generate thumbnail
response = s3_client.get_object(Bucket=bucket, Key=key)
image_data = response['Body'].read()
# Image processing logic here
# thumbnail = create_thumbnail(image_data)
# Save thumbnail back to S3
thumbnail_key = f"thumbnails/{key}"
# s3_client.put_object(Bucket=bucket, Key=thumbnail_key, Body=thumbnail)
return {"status": "processed", "thumbnail_key": thumbnail_key}
except Exception as e:
return {"status": "error", "error": str(e)}from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
import boto3
import json
# Initialize AWS services
ses_client = boto3.client("ses")
sns_client = boto3.client("sns")
@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext) -> dict:
processed_records = []
for record in event.records:
event_name = record.event_name # INSERT, MODIFY, REMOVE
try:
if event_name == "INSERT":
result = handle_insert(record)
elif event_name == "MODIFY":
result = handle_modify(record)
elif event_name == "REMOVE":
result = handle_remove(record)
else:
result = {"status": "ignored"}
processed_records.append({
"eventName": event_name,
"eventId": record.event_id,
"result": result
})
except Exception as e:
processed_records.append({
"eventName": event_name,
"eventId": record.event_id,
"error": str(e)
})
return {"processedRecords": processed_records}
def handle_insert(record) -> dict:
"""Handle new item insertion"""
new_image = record.dynamodb.get("NewImage", {})
# Extract item data (DynamoDB attribute format)
item_type = new_image.get("type", {}).get("S", "")
if item_type == "user":
return handle_new_user(new_image)
elif item_type == "order":
return handle_new_order(new_image)
else:
return {"status": "unknown_type", "type": item_type}
def handle_modify(record) -> dict:
"""Handle item modification"""
old_image = record.dynamodb.get("OldImage", {})
new_image = record.dynamodb.get("NewImage", {})
# Compare status changes
old_status = old_image.get("status", {}).get("S", "")
new_status = new_image.get("status", {}).get("S", "")
if old_status != new_status:
return handle_status_change(old_image, new_image, old_status, new_status)
return {"status": "no_significant_changes"}
def handle_remove(record) -> dict:
"""Handle item removal"""
old_image = record.dynamodb.get("OldImage", {})
# Cleanup related resources
item_id = old_image.get("id", {}).get("S", "")
item_type = old_image.get("type", {}).get("S", "")
cleanup_item(item_id, item_type)
return {"status": "cleaned_up", "item_id": item_id}
def handle_new_user(user_data: dict) -> dict:
"""Send welcome email to new user"""
email = user_data.get("email", {}).get("S", "")
name = user_data.get("name", {}).get("S", "")
if email and name:
send_welcome_email(email, name)
return {"status": "welcome_email_sent", "email": email}
return {"status": "insufficient_data"}
def handle_new_order(order_data: dict) -> dict:
"""Process new order"""
order_id = order_data.get("orderId", {}).get("S", "")
customer_id = order_data.get("customerId", {}).get("S", "")
# Send order confirmation
notify_order_created(order_id, customer_id)
return {"status": "order_notification_sent", "order_id": order_id}
def handle_status_change(old_image: dict, new_image: dict, old_status: str, new_status: str) -> dict:
"""Handle status changes"""
item_id = new_image.get("id", {}).get("S", "")
if new_status == "completed":
# Handle completion
handle_completion(item_id, new_image)
elif new_status == "cancelled":
# Handle cancellation
handle_cancellation(item_id, old_image)
return {
"status": "status_change_processed",
"old_status": old_status,
"new_status": new_status,
"item_id": item_id
}from aws_lambda_powertools.utilities.data_classes import KinesisStreamEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
import base64
@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context: LambdaContext) -> dict:
processed_records = 0
total_bytes = 0
for record in event.records:
try:
# Access Kinesis record properties
partition_key = record.partition_key
sequence_number = record.sequence_number
# Decode record data
record_data = record.data_as_json # Automatically decodes base64 and parses JSON
# Process the record
process_kinesis_record(record_data, partition_key, sequence_number)
processed_records += 1
total_bytes += len(record.data_as_bytes)
except json.JSONDecodeError as e:
print(f"Failed to parse JSON for record {record.sequence_number}: {str(e)}")
# Handle non-JSON data
text_data = record.data_as_text
process_text_record(text_data, record.partition_key)
except Exception as e:
print(f"Failed to process record {record.sequence_number}: {str(e)}")
return {
"statusCode": 200,
"processedRecords": processed_records,
"totalBytes": total_bytes
}
def process_kinesis_record(data: dict, partition_key: str, sequence_number: str):
"""Process individual Kinesis record"""
record_type = data.get("type", "unknown")
timestamp = data.get("timestamp")
print(f"Processing {record_type} record from partition {partition_key}")
if record_type == "click_event":
process_click_event(data)
elif record_type == "page_view":
process_page_view(data)
elif record_type == "purchase":
process_purchase_event(data)
else:
print(f"Unknown record type: {record_type}")
def process_click_event(event_data: dict):
"""Process click tracking event"""
user_id = event_data.get("user_id")
element_id = event_data.get("element_id")
page = event_data.get("page")
# Analytics processing
print(f"Click: user {user_id} clicked {element_id} on {page}")
def process_page_view(event_data: dict):
"""Process page view event"""
user_id = event_data.get("user_id")
page = event_data.get("page")
referrer = event_data.get("referrer")
# Page view analytics
print(f"Page view: user {user_id} viewed {page} from {referrer}")from typing import Dict, Any, List, Optional, Union
from aws_lambda_powertools.utilities.typing import LambdaContext
# Event source decorator signature
EventSourceDecorator = Callable[[Callable], Callable]
# Common property types used across data classes
Headers = Dict[str, str]
QueryParameters = Optional[Dict[str, str]]
PathParameters = Optional[Dict[str, str]]
MultiValueHeaders = Optional[Dict[str, List[str]]]
MultiValueQueryParameters = Optional[Dict[str, List[str]]]
# Request context types (vary by service)
APIGatewayRequestContext = Dict[str, Any]
ALBRequestContext = Dict[str, Any]
LambdaFunctionUrlRequestContext = Dict[str, Any]
VPCLatticeRequestContext = Dict[str, Any]
# Record types for batch processing
SQSRecords = List[SQSRecord]
SNSRecords = List[SNSRecord]
S3Records = List[S3Record]
DynamoDBRecords = List[DynamoDBRecord]
KinesisRecords = List[KinesisStreamRecord]Install with Tessl CLI
npx tessl i tessl/pypi-aws-lambda-powertoolsdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10