0
# Parser
1
2
Event parsing and validation using Pydantic models with built-in envelopes for extracting business logic from AWS event sources. Enables type-safe event processing and automatic validation of incoming Lambda events.
3
4
## Capabilities
5
6
### Event Parser Functions
7
8
High-level functions for parsing and validating Lambda events using Pydantic models.
9
10
```python { .api }
11
def event_parser(
12
model: BaseModel,
13
envelope: BaseEnvelope = None,
14
) -> Callable:
15
"""
16
Decorator for parsing Lambda events into Pydantic models.
17
18
Parameters:
19
- model: Pydantic model class to parse event into
20
- envelope: Envelope to extract data from event structure
21
22
Returns:
23
Decorated function that receives parsed model instance as parameter
24
25
Raises:
26
ValidationError: If event validation fails
27
"""
28
29
def parse(
30
event: Dict[str, Any],
31
model: BaseModel,
32
envelope: BaseEnvelope = None,
33
) -> Any:
34
"""
35
Parse event data into Pydantic model instance.
36
37
Parameters:
38
- event: Raw Lambda event dictionary
39
- model: Pydantic model class for parsing
40
- envelope: Optional envelope for data extraction
41
42
Returns:
43
Parsed model instance
44
45
Raises:
46
ValidationError: If parsing/validation fails
47
"""
48
```
49
50
### Base Model and Validation
51
52
Pydantic base model and validation utilities re-exported for convenience.
53
54
```python { .api }
55
class BaseModel:
56
"""
57
Pydantic BaseModel for defining data schemas.
58
Re-exported from pydantic for parser functionality.
59
"""
60
61
def __init__(self, **data: Any): ...
62
63
def dict(self, **kwargs) -> Dict[str, Any]:
64
"""Convert model to dictionary"""
65
66
def json(self, **kwargs) -> str:
67
"""Convert model to JSON string"""
68
69
@classmethod
70
def parse_obj(cls, obj: Dict[str, Any]) -> "BaseModel":
71
"""Parse dictionary into model instance"""
72
73
@classmethod
74
def parse_raw(cls, data: str, **kwargs) -> "BaseModel":
75
"""Parse raw string data into model instance"""
76
77
@classmethod
78
def schema(cls, **kwargs) -> Dict[str, Any]:
79
"""Get JSON schema for model"""
80
81
def Field(
82
default: Any = ...,
83
alias: str = None,
84
title: str = None,
85
description: str = None,
86
gt: float = None,
87
ge: float = None,
88
lt: float = None,
89
le: float = None,
90
min_length: int = None,
91
max_length: int = None,
92
regex: str = None,
93
**kwargs,
94
) -> Any:
95
"""
96
Pydantic Field function for model field configuration.
97
98
Parameters:
99
- default: Default field value
100
- alias: Field alias for serialization
101
- title: Field title for schema
102
- description: Field description for schema
103
- gt: Numeric greater than validation
104
- ge: Numeric greater than or equal validation
105
- lt: Numeric less than validation
106
- le: Numeric less than or equal validation
107
- min_length: Minimum string/list length
108
- max_length: Maximum string/list length
109
- regex: Regular expression pattern validation
110
- **kwargs: Additional field options
111
112
Returns:
113
Field configuration object
114
"""
115
116
def field_validator(field: str, **kwargs) -> Callable:
117
"""
118
Decorator for field-level validation.
119
120
Parameters:
121
- field: Field name to validate
122
- **kwargs: Validator options
123
124
Returns:
125
Field validator decorator
126
"""
127
128
def model_validator(mode: str = "before", **kwargs) -> Callable:
129
"""
130
Decorator for model-level validation.
131
132
Parameters:
133
- mode: Validation mode ("before" or "after")
134
- **kwargs: Validator options
135
136
Returns:
137
Model validator decorator
138
"""
139
140
class ValidationError(Exception):
141
"""
142
Pydantic validation error.
143
Raised when model validation fails.
144
"""
145
146
def __init__(self, errors: List[Dict[str, Any]]): ...
147
148
@property
149
def errors(self) -> List[Dict[str, Any]]:
150
"""Get validation error details"""
151
```
152
153
### Envelope Classes
154
155
Envelopes for extracting business data from AWS event structures.
156
157
```python { .api }
158
class BaseEnvelope:
159
"""Base envelope for event data extraction"""
160
161
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
162
"""
163
Extract and parse data from event structure.
164
165
Parameters:
166
- data: Raw event data
167
- model: Pydantic model for parsing
168
169
Returns:
170
Parsed model instance or list of instances
171
"""
172
173
class ApiGatewayEnvelope(BaseEnvelope):
174
"""Envelope for API Gateway REST API events"""
175
176
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
177
"""Extract body from API Gateway event and parse with model"""
178
179
class ApiGatewayV2Envelope(BaseEnvelope):
180
"""Envelope for API Gateway HTTP API (v2.0) events"""
181
182
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
183
"""Extract body from HTTP API event and parse with model"""
184
185
class ApiGatewayWebSocketEnvelope(BaseEnvelope):
186
"""Envelope for API Gateway WebSocket events"""
187
188
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
189
"""Extract body from WebSocket event and parse with model"""
190
191
class LambdaFunctionUrlEnvelope(BaseEnvelope):
192
"""Envelope for Lambda Function URL events"""
193
194
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
195
"""Extract body from Function URL event and parse with model"""
196
197
class ALBEnvelope(BaseEnvelope):
198
"""Envelope for Application Load Balancer events"""
199
200
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
201
"""Extract body from ALB event and parse with model"""
202
203
class SqsEnvelope(BaseEnvelope):
204
"""Envelope for SQS events"""
205
206
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
207
"""Extract and parse each SQS message body with model"""
208
209
class SnsEnvelope(BaseEnvelope):
210
"""Envelope for SNS events"""
211
212
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
213
"""Extract and parse each SNS message with model"""
214
215
class SnsSqsEnvelope(BaseEnvelope):
216
"""Envelope for SNS messages delivered via SQS"""
217
218
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
219
"""Extract SNS messages from SQS records and parse with model"""
220
221
class EventBridgeEnvelope(BaseEnvelope):
222
"""Envelope for EventBridge events"""
223
224
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
225
"""Extract detail from EventBridge event and parse with model"""
226
227
class CloudWatchLogsEnvelope(BaseEnvelope):
228
"""Envelope for CloudWatch Logs events"""
229
230
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
231
"""Extract and parse CloudWatch log events with model"""
232
233
class KinesisDataStreamEnvelope(BaseEnvelope):
234
"""Envelope for Kinesis Data Streams events"""
235
236
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
237
"""Extract and parse Kinesis records with model"""
238
239
class KinesisFirehoseEnvelope(BaseEnvelope):
240
"""Envelope for Kinesis Data Firehose transformation events"""
241
242
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
243
"""Extract and parse Firehose records with model"""
244
245
class DynamoDBStreamEnvelope(BaseEnvelope):
246
"""Envelope for DynamoDB Streams events"""
247
248
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
249
"""Extract and parse DynamoDB Stream records with model"""
250
251
class KafkaEnvelope(BaseEnvelope):
252
"""Envelope for Kafka events (MSK/Self-managed)"""
253
254
def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:
255
"""Extract and parse Kafka records with model"""
256
257
class VpcLatticeEnvelope(BaseEnvelope):
258
"""Envelope for VPC Lattice events"""
259
260
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
261
"""Extract body from VPC Lattice event and parse with model"""
262
263
class VpcLatticeV2Envelope(BaseEnvelope):
264
"""Envelope for VPC Lattice V2 events"""
265
266
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
267
"""Extract body from VPC Lattice V2 event and parse with model"""
268
269
class BedrockAgentEnvelope(BaseEnvelope):
270
"""Envelope for Bedrock Agent events"""
271
272
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
273
"""Extract parameters from Bedrock Agent event and parse with model"""
274
275
class BedrockAgentFunctionEnvelope(BaseEnvelope):
276
"""Envelope for Bedrock Agent Function events"""
277
278
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
279
"""Extract parameters from Bedrock Agent Function event and parse with model"""
280
```
281
282
### Parser Models
283
284
Pre-built Pydantic models for common AWS event types and data structures.
285
286
```python { .api }
287
# Note: The models module contains 100+ pre-built Pydantic models
288
# Here are key examples - see full documentation for complete list
289
290
class S3Model(BaseModel):
291
"""S3 object information model"""
292
bucket: str = Field(description="S3 bucket name")
293
key: str = Field(description="S3 object key")
294
size: Optional[int] = Field(description="Object size in bytes")
295
etag: Optional[str] = Field(description="Object ETag")
296
297
class SqsModel(BaseModel):
298
"""SQS message model"""
299
message_id: str = Field(description="SQS message ID")
300
receipt_handle: str = Field(description="SQS receipt handle")
301
body: str = Field(description="Message body")
302
attributes: Optional[Dict[str, str]] = Field(description="Message attributes")
303
304
class DynamoDbModel(BaseModel):
305
"""DynamoDB item model"""
306
keys: Dict[str, Any] = Field(description="Primary key values")
307
new_image: Optional[Dict[str, Any]] = Field(description="New item image")
308
old_image: Optional[Dict[str, Any]] = Field(description="Old item image")
309
event_name: str = Field(description="Event type (INSERT, MODIFY, REMOVE)")
310
311
class KinesisModel(BaseModel):
312
"""Kinesis record model"""
313
partition_key: str = Field(description="Partition key")
314
sequence_number: str = Field(description="Sequence number")
315
data: bytes = Field(description="Record data")
316
approximate_arrival_timestamp: Optional[datetime] = Field(description="Arrival timestamp")
317
318
class EventBridgeModel(BaseModel):
319
"""EventBridge event model"""
320
source: str = Field(description="Event source")
321
detail_type: str = Field(description="Event detail type")
322
detail: Dict[str, Any] = Field(description="Event detail")
323
resources: Optional[List[str]] = Field(description="Event resources")
324
```
325
326
### Type Definitions
327
328
Type aliases and utilities for parser functionality.
329
330
```python { .api }
331
from typing import Any, Dict, List, Union, Type
332
333
# JSON type alias for any JSON-serializable value
334
Json = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
335
336
# Literal type alias (re-exported from typing_extensions for compatibility)
337
Literal = Any # Actual implementation uses typing_extensions.Literal
338
```
339
340
## Usage Examples
341
342
### Basic Event Parsing
343
344
```python
345
from aws_lambda_powertools.utilities.parser import event_parser, BaseModel, Field
346
from aws_lambda_powertools.utilities.typing import LambdaContext
347
from typing import List, Optional
348
349
# Define custom data models
350
class OrderItem(BaseModel):
351
product_id: str = Field(description="Product identifier")
352
quantity: int = Field(gt=0, description="Item quantity")
353
price: float = Field(gt=0, description="Item price")
354
355
class Order(BaseModel):
356
order_id: str = Field(description="Order identifier")
357
customer_id: str = Field(description="Customer identifier")
358
items: List[OrderItem] = Field(description="Order items")
359
total: Optional[float] = Field(ge=0, description="Order total")
360
status: str = Field(default="pending", description="Order status")
361
362
# Parse API Gateway event automatically
363
@event_parser(model=Order)
364
def lambda_handler(event: Order, context: LambdaContext) -> dict:
365
# Event is automatically parsed and validated
366
print(f"Processing order {event.order_id} for customer {event.customer_id}")
367
368
# Access validated data
369
total_items = sum(item.quantity for item in event.items)
370
calculated_total = sum(item.quantity * item.price for item in event.items)
371
372
# Validate total if provided
373
if event.total and abs(event.total - calculated_total) > 0.01:
374
return {
375
"statusCode": 400,
376
"body": {"error": "Order total mismatch"}
377
}
378
379
# Process order
380
result = process_order(event)
381
382
return {
383
"statusCode": 200,
384
"body": {
385
"order_id": event.order_id,
386
"total_items": total_items,
387
"calculated_total": calculated_total,
388
"result": result
389
}
390
}
391
392
def process_order(order: Order) -> dict:
393
"""Process validated order"""
394
# Business logic with type-safe access
395
return {
396
"processed": True,
397
"items_count": len(order.items),
398
"customer_id": order.customer_id
399
}
400
```
401
402
### SQS Batch Processing with Parser
403
404
```python
405
from aws_lambda_powertools.utilities.parser import event_parser, BaseModel, Field
406
from aws_lambda_powertools.utilities.parser.envelopes import SqsEnvelope
407
from aws_lambda_powertools.utilities.typing import LambdaContext
408
from typing import List
409
from datetime import datetime
410
411
class UserEvent(BaseModel):
412
user_id: str = Field(description="User identifier")
413
event_type: str = Field(description="Event type")
414
timestamp: datetime = Field(description="Event timestamp")
415
properties: dict = Field(default_factory=dict, description="Event properties")
416
417
# Parse SQS messages into UserEvent models
418
@event_parser(model=UserEvent, envelope=SqsEnvelope)
419
def lambda_handler(events: List[UserEvent], context: LambdaContext) -> dict:
420
processed_events = []
421
422
for event in events:
423
# Each SQS message is parsed and validated
424
print(f"Processing {event.event_type} event for user {event.user_id}")
425
426
# Type-safe access to event data
427
result = process_user_event(event)
428
processed_events.append(result)
429
430
return {
431
"statusCode": 200,
432
"processed_count": len(processed_events),
433
"results": processed_events
434
}
435
436
def process_user_event(event: UserEvent) -> dict:
437
"""Process individual user event"""
438
if event.event_type == "login":
439
return handle_login_event(event)
440
elif event.event_type == "purchase":
441
return handle_purchase_event(event)
442
else:
443
return {"status": "unknown_event_type"}
444
445
def handle_login_event(event: UserEvent) -> dict:
446
"""Handle user login event"""
447
ip_address = event.properties.get("ip_address")
448
device = event.properties.get("device")
449
450
return {
451
"event_type": "login",
452
"user_id": event.user_id,
453
"ip_address": ip_address,
454
"device": device,
455
"processed_at": datetime.utcnow().isoformat()
456
}
457
458
def handle_purchase_event(event: UserEvent) -> dict:
459
"""Handle purchase event"""
460
amount = event.properties.get("amount", 0)
461
currency = event.properties.get("currency", "USD")
462
463
return {
464
"event_type": "purchase",
465
"user_id": event.user_id,
466
"amount": amount,
467
"currency": currency,
468
"processed_at": datetime.utcnow().isoformat()
469
}
470
```
471
472
### Custom Validation with Pydantic
473
474
```python
475
from aws_lambda_powertools.utilities.parser import (
476
event_parser,
477
BaseModel,
478
Field,
479
field_validator,
480
model_validator,
481
ValidationError
482
)
483
from aws_lambda_powertools.utilities.typing import LambdaContext
484
from typing import List, Optional
485
import re
486
487
class EmailAddress(BaseModel):
488
email: str = Field(description="Email address")
489
490
@field_validator('email')
491
@classmethod
492
def validate_email(cls, v: str) -> str:
493
"""Validate email format"""
494
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
495
if not re.match(email_regex, v):
496
raise ValueError("Invalid email format")
497
return v.lower()
498
499
class CustomerRegistration(BaseModel):
500
first_name: str = Field(min_length=1, max_length=50, description="First name")
501
last_name: str = Field(min_length=1, max_length=50, description="Last name")
502
email: str = Field(description="Email address")
503
phone: Optional[str] = Field(default=None, description="Phone number")
504
age: int = Field(ge=13, le=120, description="Customer age")
505
marketing_consent: bool = Field(default=False, description="Marketing consent")
506
507
@field_validator('email')
508
@classmethod
509
def validate_email(cls, v: str) -> str:
510
"""Validate email format"""
511
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
512
if not re.match(email_regex, v):
513
raise ValueError("Invalid email format")
514
return v.lower()
515
516
@field_validator('phone')
517
@classmethod
518
def validate_phone(cls, v: Optional[str]) -> Optional[str]:
519
"""Validate phone number format"""
520
if v is None:
521
return v
522
523
# Remove common separators
524
phone_clean = re.sub(r'[\s\-\(\)]+', '', v)
525
526
# Check if it's a valid phone number (simple validation)
527
if not re.match(r'^\+?[\d]{10,15}$', phone_clean):
528
raise ValueError("Invalid phone number format")
529
530
return phone_clean
531
532
@model_validator(mode='after')
533
def validate_marketing_consent(self) -> 'CustomerRegistration':
534
"""Validate marketing consent rules"""
535
# European customers under 16 cannot consent to marketing
536
if self.age < 16 and self.marketing_consent:
537
raise ValueError("Customers under 16 cannot consent to marketing")
538
539
return self
540
541
@event_parser(model=CustomerRegistration)
542
def lambda_handler(event: CustomerRegistration, context: LambdaContext) -> dict:
543
try:
544
# Event is automatically validated
545
print(f"Registering customer: {event.first_name} {event.last_name}")
546
547
# Process registration
548
customer_id = register_customer(event)
549
550
return {
551
"statusCode": 200,
552
"body": {
553
"customer_id": customer_id,
554
"message": "Registration successful"
555
}
556
}
557
558
except ValidationError as e:
559
# Handle validation errors
560
print(f"Validation failed: {e.errors()}")
561
return {
562
"statusCode": 400,
563
"body": {
564
"error": "Invalid registration data",
565
"details": [
566
{"field": err["loc"][-1], "message": err["msg"]}
567
for err in e.errors()
568
]
569
}
570
}
571
572
def register_customer(registration: CustomerRegistration) -> str:
573
"""Register new customer with validated data"""
574
import uuid
575
576
customer_id = str(uuid.uuid4())
577
578
# Save to database (mock)
579
customer_data = {
580
"id": customer_id,
581
"first_name": registration.first_name,
582
"last_name": registration.last_name,
583
"email": registration.email,
584
"phone": registration.phone,
585
"age": registration.age,
586
"marketing_consent": registration.marketing_consent,
587
"created_at": datetime.utcnow().isoformat()
588
}
589
590
print(f"Saving customer: {customer_data}")
591
592
return customer_id
593
```
594
595
### Multiple Event Types with Custom Envelopes
596
597
```python
598
from aws_lambda_powertools.utilities.parser import (
599
parse,
600
BaseModel,
601
Field,
602
BaseEnvelope
603
)
604
from aws_lambda_powertools.utilities.parser.envelopes import (
605
SqsEnvelope,
606
EventBridgeEnvelope,
607
KinesisDataStreamEnvelope
608
)
609
from aws_lambda_powertools.utilities.typing import LambdaContext
610
from typing import List, Union, Any, Dict
611
612
# Define different event models
613
class OrderCreated(BaseModel):
614
order_id: str = Field(description="Order ID")
615
customer_id: str = Field(description="Customer ID")
616
total_amount: float = Field(ge=0, description="Order total")
617
currency: str = Field(default="USD", description="Currency code")
618
619
class PaymentProcessed(BaseModel):
620
payment_id: str = Field(description="Payment ID")
621
order_id: str = Field(description="Related order ID")
622
amount: float = Field(ge=0, description="Payment amount")
623
status: str = Field(description="Payment status")
624
625
class InventoryUpdate(BaseModel):
626
product_id: str = Field(description="Product ID")
627
quantity_change: int = Field(description="Quantity change (+/-)")
628
warehouse_id: str = Field(description="Warehouse ID")
629
630
def lambda_handler(event: dict, context: LambdaContext) -> dict:
631
"""Handle multiple event sources and types"""
632
633
# Determine event source and parse accordingly
634
if "Records" in event:
635
if event["Records"][0].get("eventSource") == "aws:sqs":
636
return handle_sqs_events(event, context)
637
elif event["Records"][0].get("eventSource") == "aws:kinesis":
638
return handle_kinesis_events(event, context)
639
elif "source" in event:
640
return handle_eventbridge_event(event, context)
641
else:
642
return {"statusCode": 400, "body": "Unknown event type"}
643
644
def handle_sqs_events(event: dict, context: LambdaContext) -> dict:
645
"""Handle SQS events with different message types"""
646
results = []
647
648
# Parse SQS envelope to get message bodies
649
for record in event["Records"]:
650
message_body = record["body"]
651
652
# Parse message to determine type
653
try:
654
import json
655
message_data = json.loads(message_body)
656
event_type = message_data.get("type")
657
658
if event_type == "order_created":
659
parsed_event = parse(message_data["data"], OrderCreated)
660
result = process_order_created(parsed_event)
661
elif event_type == "payment_processed":
662
parsed_event = parse(message_data["data"], PaymentProcessed)
663
result = process_payment(parsed_event)
664
else:
665
result = {"error": f"Unknown event type: {event_type}"}
666
667
results.append(result)
668
669
except Exception as e:
670
results.append({"error": str(e)})
671
672
return {
673
"statusCode": 200,
674
"processed": len(results),
675
"results": results
676
}
677
678
def handle_eventbridge_event(event: dict, context: LambdaContext) -> dict:
679
"""Handle EventBridge events"""
680
681
detail_type = event.get("detail-type")
682
683
try:
684
if detail_type == "Order Created":
685
parsed_event = parse(event, OrderCreated, envelope=EventBridgeEnvelope)
686
result = process_order_created(parsed_event)
687
elif detail_type == "Payment Processed":
688
parsed_event = parse(event, PaymentProcessed, envelope=EventBridgeEnvelope)
689
result = process_payment(parsed_event)
690
else:
691
result = {"error": f"Unknown detail type: {detail_type}"}
692
693
return {
694
"statusCode": 200,
695
"result": result
696
}
697
698
except Exception as e:
699
return {
700
"statusCode": 500,
701
"error": str(e)
702
}
703
704
def handle_kinesis_events(event: dict, context: LambdaContext) -> dict:
705
"""Handle Kinesis stream events"""
706
707
try:
708
# Parse all Kinesis records as inventory updates
709
inventory_updates = parse(event, InventoryUpdate, envelope=KinesisDataStreamEnvelope)
710
711
results = []
712
for update in inventory_updates:
713
result = process_inventory_update(update)
714
results.append(result)
715
716
return {
717
"statusCode": 200,
718
"processed": len(results),
719
"results": results
720
}
721
722
except Exception as e:
723
return {
724
"statusCode": 500,
725
"error": str(e)
726
}
727
728
def process_order_created(order: OrderCreated) -> dict:
729
"""Process order creation event"""
730
return {
731
"type": "order_processed",
732
"order_id": order.order_id,
733
"customer_id": order.customer_id,
734
"amount": order.total_amount,
735
"currency": order.currency
736
}
737
738
def process_payment(payment: PaymentProcessed) -> dict:
739
"""Process payment event"""
740
return {
741
"type": "payment_processed",
742
"payment_id": payment.payment_id,
743
"order_id": payment.order_id,
744
"amount": payment.amount,
745
"status": payment.status
746
}
747
748
def process_inventory_update(update: InventoryUpdate) -> dict:
749
"""Process inventory update event"""
750
return {
751
"type": "inventory_updated",
752
"product_id": update.product_id,
753
"quantity_change": update.quantity_change,
754
"warehouse_id": update.warehouse_id
755
}
756
```
757
758
### Advanced Parser Features
759
760
```python
761
from aws_lambda_powertools.utilities.parser import (
762
event_parser,
763
BaseModel,
764
Field,
765
field_validator,
766
model_validator,
767
BaseEnvelope
768
)
769
from aws_lambda_powertools.utilities.typing import LambdaContext
770
from typing import List, Optional, Union, Any
771
from datetime import datetime, timezone
772
from decimal import Decimal
773
import json
774
775
class CustomEnvelope(BaseEnvelope):
776
"""Custom envelope for proprietary event format"""
777
778
def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:
779
# Extract nested payload from custom format
780
payload = data.get("payload", {})
781
metadata = data.get("metadata", {})
782
783
# Merge metadata into payload for parsing
784
if metadata:
785
payload["_metadata"] = metadata
786
787
return model.parse_obj(payload)
788
789
class Money(BaseModel):
790
"""Money value with currency"""
791
amount: Decimal = Field(description="Monetary amount")
792
currency: str = Field(default="USD", description="Currency code")
793
794
@field_validator('currency')
795
@classmethod
796
def validate_currency(cls, v: str) -> str:
797
"""Validate currency code"""
798
valid_currencies = ["USD", "EUR", "GBP", "JPY", "CAD", "AUD"]
799
if v.upper() not in valid_currencies:
800
raise ValueError(f"Invalid currency: {v}")
801
return v.upper()
802
803
class Address(BaseModel):
804
"""Address information"""
805
street: str = Field(description="Street address")
806
city: str = Field(description="City")
807
state: Optional[str] = Field(default=None, description="State/Province")
808
postal_code: str = Field(description="Postal/ZIP code")
809
country: str = Field(description="Country code")
810
811
class ComplexOrder(BaseModel):
812
"""Complex order with nested models and validation"""
813
order_id: str = Field(description="Order identifier")
814
customer_id: str = Field(description="Customer identifier")
815
order_date: datetime = Field(description="Order date")
816
817
# Nested models
818
billing_address: Address = Field(description="Billing address")
819
shipping_address: Optional[Address] = Field(default=None, description="Shipping address")
820
821
# Complex fields
822
total: Money = Field(description="Order total")
823
tax: Money = Field(description="Tax amount")
824
shipping_cost: Money = Field(description="Shipping cost")
825
826
# Optional metadata
827
_metadata: Optional[dict] = Field(default=None, alias="metadata", description="Event metadata")
828
829
@field_validator('order_date')
830
@classmethod
831
def validate_order_date(cls, v: datetime) -> datetime:
832
"""Ensure order date is timezone-aware"""
833
if v.tzinfo is None:
834
v = v.replace(tzinfo=timezone.utc)
835
return v
836
837
@model_validator(mode='after')
838
def validate_order_totals(self) -> 'ComplexOrder':
839
"""Validate order financial calculations"""
840
# Ensure all amounts use same currency
841
currencies = {self.total.currency, self.tax.currency, self.shipping_cost.currency}
842
if len(currencies) > 1:
843
raise ValueError("All monetary amounts must use the same currency")
844
845
# Validate total calculation (simplified)
846
calculated_total = self.tax.amount + self.shipping_cost.amount
847
if abs(self.total.amount - calculated_total) > Decimal('0.01'):
848
# In a real scenario, you'd validate against line items too
849
pass # Skip validation for this example
850
851
return self
852
853
@model_validator(mode='after')
854
def set_default_shipping_address(self) -> 'ComplexOrder':
855
"""Set shipping address to billing if not provided"""
856
if self.shipping_address is None:
857
self.shipping_address = self.billing_address
858
859
return self
860
861
@event_parser(model=ComplexOrder, envelope=CustomEnvelope)
862
def lambda_handler(event: ComplexOrder, context: LambdaContext) -> dict:
863
"""Process complex order with full validation"""
864
865
print(f"Processing order {event.order_id} for customer {event.customer_id}")
866
867
# Access nested model data
868
billing_city = event.billing_address.city
869
shipping_city = event.shipping_address.city
870
871
# Work with validated monetary amounts
872
total_amount = event.total.amount
873
currency = event.total.currency
874
875
# Access metadata if provided
876
source_system = None
877
if event._metadata:
878
source_system = event._metadata.get("source_system")
879
880
# Process order
881
result = {
882
"order_id": event.order_id,
883
"customer_id": event.customer_id,
884
"total_amount": str(total_amount),
885
"currency": currency,
886
"billing_city": billing_city,
887
"shipping_city": shipping_city,
888
"source_system": source_system,
889
"processed_at": datetime.utcnow().isoformat()
890
}
891
892
# Perform business logic
893
process_complex_order(event)
894
895
return {
896
"statusCode": 200,
897
"body": result
898
}
899
900
def process_complex_order(order: ComplexOrder):
901
"""Process order with type-safe access to all fields"""
902
903
# Calculate shipping distance (mock)
904
if order.billing_address.city != order.shipping_address.city:
905
print(f"Cross-city shipping: {order.billing_address.city} -> {order.shipping_address.city}")
906
907
# Handle international orders
908
if order.billing_address.country != order.shipping_address.country:
909
print(f"International order: {order.billing_address.country} -> {order.shipping_address.country}")
910
# Apply international shipping rules
911
912
# Process payment
913
print(f"Processing payment: {order.total.amount} {order.total.currency}")
914
915
# Log order details
916
print(f"Order details: {order.dict()}")
917
```
918
919
## Types
920
921
```python { .api }
922
from typing import Dict, Any, List, Union, Type, Optional, Callable
923
from pydantic import BaseModel as PydanticBaseModel, ValidationError as PydanticValidationError
924
925
# Parser function signatures
926
EventParserDecorator = Callable[[Callable], Callable]
927
ParseFunction = Callable[[Dict[str, Any], Type[BaseModel], Optional[BaseEnvelope]], Any]
928
929
# Model types
930
ModelClass = Type[BaseModel]
931
ModelInstance = BaseModel
932
933
# Validation types
934
ValidationError = PydanticValidationError
935
ValidationErrors = List[Dict[str, Any]]
936
937
# Envelope types
938
EnvelopeClass = Type[BaseEnvelope]
939
EnvelopeInstance = BaseEnvelope
940
941
# Event data types
942
EventData = Dict[str, Any]
943
ParsedData = Union[Any, List[Any]]
944
945
# Field validation decorator
946
FieldValidator = Callable[[Any], Any]
947
ModelValidator = Callable[[BaseModel], BaseModel]
948
949
# JSON types
950
JsonValue = Union[str, int, float, bool, None, Dict[str, Any], List[Any]]
951
Json = JsonValue
952
953
# Literal type for type hints
954
from typing_extensions import Literal
955
```