or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdcore-observability.mddata-classes.mdevent-handlers.mdfeature-flags.mdindex.mdparameters.mdparser.mdutilities.md

parser.mddocs/

0

# Parser

1

2

Event parsing and validation using Pydantic models with built-in envelopes for extracting business logic from AWS event sources. Enables type-safe event processing and automatic validation of incoming Lambda events.

3

4

## Capabilities

5

6

### Event Parser Functions

7

8

High-level functions for parsing and validating Lambda events using Pydantic models.

9

10

```python { .api }

11

def event_parser(

12

model: BaseModel,

13

envelope: BaseEnvelope = None,

14

) -> Callable:

15

"""

16

Decorator for parsing Lambda events into Pydantic models.

17

18

Parameters:

19

- model: Pydantic model class to parse event into

20

- envelope: Envelope to extract data from event structure

21

22

Returns:

23

Decorated function that receives parsed model instance as parameter

24

25

Raises:

26

ValidationError: If event validation fails

27

"""

28

29

def parse(

30

event: Dict[str, Any],

31

model: BaseModel,

32

envelope: BaseEnvelope = None,

33

) -> Any:

34

"""

35

Parse event data into Pydantic model instance.

36

37

Parameters:

38

- event: Raw Lambda event dictionary

39

- model: Pydantic model class for parsing

40

- envelope: Optional envelope for data extraction

41

42

Returns:

43

Parsed model instance

44

45

Raises:

46

ValidationError: If parsing/validation fails

47

"""

48

```

49

50

### Base Model and Validation

51

52

Pydantic base model and validation utilities re-exported for convenience.

53

54

```python { .api }

55

class BaseModel:

56

"""

57

Pydantic BaseModel for defining data schemas.

58

Re-exported from pydantic for parser functionality.

59

"""

60

61

def __init__(self, **data: Any): ...

62

63

def dict(self, **kwargs) -> Dict[str, Any]:

64

"""Convert model to dictionary"""

65

66

def json(self, **kwargs) -> str:

67

"""Convert model to JSON string"""

68

69

@classmethod

70

def parse_obj(cls, obj: Dict[str, Any]) -> "BaseModel":

71

"""Parse dictionary into model instance"""

72

73

@classmethod

74

def parse_raw(cls, data: str, **kwargs) -> "BaseModel":

75

"""Parse raw string data into model instance"""

76

77

@classmethod

78

def schema(cls, **kwargs) -> Dict[str, Any]:

79

"""Get JSON schema for model"""

80

81

def Field(

82

default: Any = ...,

83

alias: str = None,

84

title: str = None,

85

description: str = None,

86

gt: float = None,

87

ge: float = None,

88

lt: float = None,

89

le: float = None,

90

min_length: int = None,

91

max_length: int = None,

92

regex: str = None,

93

**kwargs,

94

) -> Any:

95

"""

96

Pydantic Field function for model field configuration.

97

98

Parameters:

99

- default: Default field value

100

- alias: Field alias for serialization

101

- title: Field title for schema

102

- description: Field description for schema

103

- gt: Numeric greater than validation

104

- ge: Numeric greater than or equal validation

105

- lt: Numeric less than validation

106

- le: Numeric less than or equal validation

107

- min_length: Minimum string/list length

108

- max_length: Maximum string/list length

109

- regex: Regular expression pattern validation

110

- **kwargs: Additional field options

111

112

Returns:

113

Field configuration object

114

"""

115

116

def field_validator(field: str, **kwargs) -> Callable:

117

"""

118

Decorator for field-level validation.

119

120

Parameters:

121

- field: Field name to validate

122

- **kwargs: Validator options

123

124

Returns:

125

Field validator decorator

126

"""

127

128

def model_validator(mode: str = "before", **kwargs) -> Callable:

129

"""

130

Decorator for model-level validation.

131

132

Parameters:

133

- mode: Validation mode ("before" or "after")

134

- **kwargs: Validator options

135

136

Returns:

137

Model validator decorator

138

"""

139

140

class ValidationError(Exception):

141

"""

142

Pydantic validation error.

143

Raised when model validation fails.

144

"""

145

146

def __init__(self, errors: List[Dict[str, Any]]): ...

147

148

@property

149

def errors(self) -> List[Dict[str, Any]]:

150

"""Get validation error details"""

151

```

152

153

### Envelope Classes

154

155

Envelopes for extracting business data from AWS event structures.

156

157

```python { .api }

158

class BaseEnvelope:

159

"""Base envelope for event data extraction"""

160

161

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

162

"""

163

Extract and parse data from event structure.

164

165

Parameters:

166

- data: Raw event data

167

- model: Pydantic model for parsing

168

169

Returns:

170

Parsed model instance or list of instances

171

"""

172

173

class ApiGatewayEnvelope(BaseEnvelope):

174

"""Envelope for API Gateway REST API events"""

175

176

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

177

"""Extract body from API Gateway event and parse with model"""

178

179

class ApiGatewayV2Envelope(BaseEnvelope):

180

"""Envelope for API Gateway HTTP API (v2.0) events"""

181

182

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

183

"""Extract body from HTTP API event and parse with model"""

184

185

class ApiGatewayWebSocketEnvelope(BaseEnvelope):

186

"""Envelope for API Gateway WebSocket events"""

187

188

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

189

"""Extract body from WebSocket event and parse with model"""

190

191

class LambdaFunctionUrlEnvelope(BaseEnvelope):

192

"""Envelope for Lambda Function URL events"""

193

194

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

195

"""Extract body from Function URL event and parse with model"""

196

197

class ALBEnvelope(BaseEnvelope):

198

"""Envelope for Application Load Balancer events"""

199

200

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

201

"""Extract body from ALB event and parse with model"""

202

203

class SqsEnvelope(BaseEnvelope):

204

"""Envelope for SQS events"""

205

206

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

207

"""Extract and parse each SQS message body with model"""

208

209

class SnsEnvelope(BaseEnvelope):

210

"""Envelope for SNS events"""

211

212

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

213

"""Extract and parse each SNS message with model"""

214

215

class SnsSqsEnvelope(BaseEnvelope):

216

"""Envelope for SNS messages delivered via SQS"""

217

218

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

219

"""Extract SNS messages from SQS records and parse with model"""

220

221

class EventBridgeEnvelope(BaseEnvelope):

222

"""Envelope for EventBridge events"""

223

224

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

225

"""Extract detail from EventBridge event and parse with model"""

226

227

class CloudWatchLogsEnvelope(BaseEnvelope):

228

"""Envelope for CloudWatch Logs events"""

229

230

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

231

"""Extract and parse CloudWatch log events with model"""

232

233

class KinesisDataStreamEnvelope(BaseEnvelope):

234

"""Envelope for Kinesis Data Streams events"""

235

236

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

237

"""Extract and parse Kinesis records with model"""

238

239

class KinesisFirehoseEnvelope(BaseEnvelope):

240

"""Envelope for Kinesis Data Firehose transformation events"""

241

242

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

243

"""Extract and parse Firehose records with model"""

244

245

class DynamoDBStreamEnvelope(BaseEnvelope):

246

"""Envelope for DynamoDB Streams events"""

247

248

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

249

"""Extract and parse DynamoDB Stream records with model"""

250

251

class KafkaEnvelope(BaseEnvelope):

252

"""Envelope for Kafka events (MSK/Self-managed)"""

253

254

def parse(self, data: Dict[str, Any], model: BaseModel) -> List[Any]:

255

"""Extract and parse Kafka records with model"""

256

257

class VpcLatticeEnvelope(BaseEnvelope):

258

"""Envelope for VPC Lattice events"""

259

260

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

261

"""Extract body from VPC Lattice event and parse with model"""

262

263

class VpcLatticeV2Envelope(BaseEnvelope):

264

"""Envelope for VPC Lattice V2 events"""

265

266

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

267

"""Extract body from VPC Lattice V2 event and parse with model"""

268

269

class BedrockAgentEnvelope(BaseEnvelope):

270

"""Envelope for Bedrock Agent events"""

271

272

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

273

"""Extract parameters from Bedrock Agent event and parse with model"""

274

275

class BedrockAgentFunctionEnvelope(BaseEnvelope):

276

"""Envelope for Bedrock Agent Function events"""

277

278

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

279

"""Extract parameters from Bedrock Agent Function event and parse with model"""

280

```

281

282

### Parser Models

283

284

Pre-built Pydantic models for common AWS event types and data structures.

285

286

```python { .api }

287

# Note: The models module contains 100+ pre-built Pydantic models

288

# Here are key examples - see full documentation for complete list

289

290

class S3Model(BaseModel):

291

"""S3 object information model"""

292

bucket: str = Field(description="S3 bucket name")

293

key: str = Field(description="S3 object key")

294

size: Optional[int] = Field(description="Object size in bytes")

295

etag: Optional[str] = Field(description="Object ETag")

296

297

class SqsModel(BaseModel):

298

"""SQS message model"""

299

message_id: str = Field(description="SQS message ID")

300

receipt_handle: str = Field(description="SQS receipt handle")

301

body: str = Field(description="Message body")

302

attributes: Optional[Dict[str, str]] = Field(description="Message attributes")

303

304

class DynamoDbModel(BaseModel):

305

"""DynamoDB item model"""

306

keys: Dict[str, Any] = Field(description="Primary key values")

307

new_image: Optional[Dict[str, Any]] = Field(description="New item image")

308

old_image: Optional[Dict[str, Any]] = Field(description="Old item image")

309

event_name: str = Field(description="Event type (INSERT, MODIFY, REMOVE)")

310

311

class KinesisModel(BaseModel):

312

"""Kinesis record model"""

313

partition_key: str = Field(description="Partition key")

314

sequence_number: str = Field(description="Sequence number")

315

data: bytes = Field(description="Record data")

316

approximate_arrival_timestamp: Optional[datetime] = Field(description="Arrival timestamp")

317

318

class EventBridgeModel(BaseModel):

319

"""EventBridge event model"""

320

source: str = Field(description="Event source")

321

detail_type: str = Field(description="Event detail type")

322

detail: Dict[str, Any] = Field(description="Event detail")

323

resources: Optional[List[str]] = Field(description="Event resources")

324

```

325

326

### Type Definitions

327

328

Type aliases and utilities for parser functionality.

329

330

```python { .api }

331

from typing import Any, Dict, List, Union, Type

332

333

# JSON type alias for any JSON-serializable value

334

Json = Union[Dict[str, Any], List[Any], str, int, float, bool, None]

335

336

# Literal type alias (re-exported from typing_extensions for compatibility)

337

Literal = Any # Actual implementation uses typing_extensions.Literal

338

```

339

340

## Usage Examples

341

342

### Basic Event Parsing

343

344

```python

345

from aws_lambda_powertools.utilities.parser import event_parser, BaseModel, Field

346

from aws_lambda_powertools.utilities.typing import LambdaContext

347

from typing import List, Optional

348

349

# Define custom data models

350

class OrderItem(BaseModel):

351

product_id: str = Field(description="Product identifier")

352

quantity: int = Field(gt=0, description="Item quantity")

353

price: float = Field(gt=0, description="Item price")

354

355

class Order(BaseModel):

356

order_id: str = Field(description="Order identifier")

357

customer_id: str = Field(description="Customer identifier")

358

items: List[OrderItem] = Field(description="Order items")

359

total: Optional[float] = Field(ge=0, description="Order total")

360

status: str = Field(default="pending", description="Order status")

361

362

# Parse API Gateway event automatically

363

@event_parser(model=Order)

364

def lambda_handler(event: Order, context: LambdaContext) -> dict:

365

# Event is automatically parsed and validated

366

print(f"Processing order {event.order_id} for customer {event.customer_id}")

367

368

# Access validated data

369

total_items = sum(item.quantity for item in event.items)

370

calculated_total = sum(item.quantity * item.price for item in event.items)

371

372

# Validate total if provided

373

if event.total and abs(event.total - calculated_total) > 0.01:

374

return {

375

"statusCode": 400,

376

"body": {"error": "Order total mismatch"}

377

}

378

379

# Process order

380

result = process_order(event)

381

382

return {

383

"statusCode": 200,

384

"body": {

385

"order_id": event.order_id,

386

"total_items": total_items,

387

"calculated_total": calculated_total,

388

"result": result

389

}

390

}

391

392

def process_order(order: Order) -> dict:

393

"""Process validated order"""

394

# Business logic with type-safe access

395

return {

396

"processed": True,

397

"items_count": len(order.items),

398

"customer_id": order.customer_id

399

}

400

```

401

402

### SQS Batch Processing with Parser

403

404

```python

405

from aws_lambda_powertools.utilities.parser import event_parser, BaseModel, Field

406

from aws_lambda_powertools.utilities.parser.envelopes import SqsEnvelope

407

from aws_lambda_powertools.utilities.typing import LambdaContext

408

from typing import List

409

from datetime import datetime

410

411

class UserEvent(BaseModel):

412

user_id: str = Field(description="User identifier")

413

event_type: str = Field(description="Event type")

414

timestamp: datetime = Field(description="Event timestamp")

415

properties: dict = Field(default_factory=dict, description="Event properties")

416

417

# Parse SQS messages into UserEvent models

418

@event_parser(model=UserEvent, envelope=SqsEnvelope)

419

def lambda_handler(events: List[UserEvent], context: LambdaContext) -> dict:

420

processed_events = []

421

422

for event in events:

423

# Each SQS message is parsed and validated

424

print(f"Processing {event.event_type} event for user {event.user_id}")

425

426

# Type-safe access to event data

427

result = process_user_event(event)

428

processed_events.append(result)

429

430

return {

431

"statusCode": 200,

432

"processed_count": len(processed_events),

433

"results": processed_events

434

}

435

436

def process_user_event(event: UserEvent) -> dict:

437

"""Process individual user event"""

438

if event.event_type == "login":

439

return handle_login_event(event)

440

elif event.event_type == "purchase":

441

return handle_purchase_event(event)

442

else:

443

return {"status": "unknown_event_type"}

444

445

def handle_login_event(event: UserEvent) -> dict:

446

"""Handle user login event"""

447

ip_address = event.properties.get("ip_address")

448

device = event.properties.get("device")

449

450

return {

451

"event_type": "login",

452

"user_id": event.user_id,

453

"ip_address": ip_address,

454

"device": device,

455

"processed_at": datetime.utcnow().isoformat()

456

}

457

458

def handle_purchase_event(event: UserEvent) -> dict:

459

"""Handle purchase event"""

460

amount = event.properties.get("amount", 0)

461

currency = event.properties.get("currency", "USD")

462

463

return {

464

"event_type": "purchase",

465

"user_id": event.user_id,

466

"amount": amount,

467

"currency": currency,

468

"processed_at": datetime.utcnow().isoformat()

469

}

470

```

471

472

### Custom Validation with Pydantic

473

474

```python

475

from aws_lambda_powertools.utilities.parser import (

476

event_parser,

477

BaseModel,

478

Field,

479

field_validator,

480

model_validator,

481

ValidationError

482

)

483

from aws_lambda_powertools.utilities.typing import LambdaContext

484

from typing import List, Optional

485

import re

486

487

class EmailAddress(BaseModel):

488

email: str = Field(description="Email address")

489

490

@field_validator('email')

491

@classmethod

492

def validate_email(cls, v: str) -> str:

493

"""Validate email format"""

494

email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

495

if not re.match(email_regex, v):

496

raise ValueError("Invalid email format")

497

return v.lower()

498

499

class CustomerRegistration(BaseModel):

500

first_name: str = Field(min_length=1, max_length=50, description="First name")

501

last_name: str = Field(min_length=1, max_length=50, description="Last name")

502

email: str = Field(description="Email address")

503

phone: Optional[str] = Field(default=None, description="Phone number")

504

age: int = Field(ge=13, le=120, description="Customer age")

505

marketing_consent: bool = Field(default=False, description="Marketing consent")

506

507

@field_validator('email')

508

@classmethod

509

def validate_email(cls, v: str) -> str:

510

"""Validate email format"""

511

email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

512

if not re.match(email_regex, v):

513

raise ValueError("Invalid email format")

514

return v.lower()

515

516

@field_validator('phone')

517

@classmethod

518

def validate_phone(cls, v: Optional[str]) -> Optional[str]:

519

"""Validate phone number format"""

520

if v is None:

521

return v

522

523

# Remove common separators

524

phone_clean = re.sub(r'[\s\-\(\)]+', '', v)

525

526

# Check if it's a valid phone number (simple validation)

527

if not re.match(r'^\+?[\d]{10,15}$', phone_clean):

528

raise ValueError("Invalid phone number format")

529

530

return phone_clean

531

532

@model_validator(mode='after')

533

def validate_marketing_consent(self) -> 'CustomerRegistration':

534

"""Validate marketing consent rules"""

535

# European customers under 16 cannot consent to marketing

536

if self.age < 16 and self.marketing_consent:

537

raise ValueError("Customers under 16 cannot consent to marketing")

538

539

return self

540

541

@event_parser(model=CustomerRegistration)

542

def lambda_handler(event: CustomerRegistration, context: LambdaContext) -> dict:

543

try:

544

# Event is automatically validated

545

print(f"Registering customer: {event.first_name} {event.last_name}")

546

547

# Process registration

548

customer_id = register_customer(event)

549

550

return {

551

"statusCode": 200,

552

"body": {

553

"customer_id": customer_id,

554

"message": "Registration successful"

555

}

556

}

557

558

except ValidationError as e:

559

# Handle validation errors

560

print(f"Validation failed: {e.errors()}")

561

return {

562

"statusCode": 400,

563

"body": {

564

"error": "Invalid registration data",

565

"details": [

566

{"field": err["loc"][-1], "message": err["msg"]}

567

for err in e.errors()

568

]

569

}

570

}

571

572

def register_customer(registration: CustomerRegistration) -> str:

573

"""Register new customer with validated data"""

574

import uuid

575

576

customer_id = str(uuid.uuid4())

577

578

# Save to database (mock)

579

customer_data = {

580

"id": customer_id,

581

"first_name": registration.first_name,

582

"last_name": registration.last_name,

583

"email": registration.email,

584

"phone": registration.phone,

585

"age": registration.age,

586

"marketing_consent": registration.marketing_consent,

587

"created_at": datetime.utcnow().isoformat()

588

}

589

590

print(f"Saving customer: {customer_data}")

591

592

return customer_id

593

```

594

595

### Multiple Event Types with Custom Envelopes

596

597

```python

598

from aws_lambda_powertools.utilities.parser import (

599

parse,

600

BaseModel,

601

Field,

602

BaseEnvelope

603

)

604

from aws_lambda_powertools.utilities.parser.envelopes import (

605

SqsEnvelope,

606

EventBridgeEnvelope,

607

KinesisDataStreamEnvelope

608

)

609

from aws_lambda_powertools.utilities.typing import LambdaContext

610

from typing import List, Union, Any, Dict

611

612

# Define different event models

613

class OrderCreated(BaseModel):

614

order_id: str = Field(description="Order ID")

615

customer_id: str = Field(description="Customer ID")

616

total_amount: float = Field(ge=0, description="Order total")

617

currency: str = Field(default="USD", description="Currency code")

618

619

class PaymentProcessed(BaseModel):

620

payment_id: str = Field(description="Payment ID")

621

order_id: str = Field(description="Related order ID")

622

amount: float = Field(ge=0, description="Payment amount")

623

status: str = Field(description="Payment status")

624

625

class InventoryUpdate(BaseModel):

626

product_id: str = Field(description="Product ID")

627

quantity_change: int = Field(description="Quantity change (+/-)")

628

warehouse_id: str = Field(description="Warehouse ID")

629

630

def lambda_handler(event: dict, context: LambdaContext) -> dict:

631

"""Handle multiple event sources and types"""

632

633

# Determine event source and parse accordingly

634

if "Records" in event:

635

if event["Records"][0].get("eventSource") == "aws:sqs":

636

return handle_sqs_events(event, context)

637

elif event["Records"][0].get("eventSource") == "aws:kinesis":

638

return handle_kinesis_events(event, context)

639

elif "source" in event:

640

return handle_eventbridge_event(event, context)

641

else:

642

return {"statusCode": 400, "body": "Unknown event type"}

643

644

def handle_sqs_events(event: dict, context: LambdaContext) -> dict:

645

"""Handle SQS events with different message types"""

646

results = []

647

648

# Parse SQS envelope to get message bodies

649

for record in event["Records"]:

650

message_body = record["body"]

651

652

# Parse message to determine type

653

try:

654

import json

655

message_data = json.loads(message_body)

656

event_type = message_data.get("type")

657

658

if event_type == "order_created":

659

parsed_event = parse(message_data["data"], OrderCreated)

660

result = process_order_created(parsed_event)

661

elif event_type == "payment_processed":

662

parsed_event = parse(message_data["data"], PaymentProcessed)

663

result = process_payment(parsed_event)

664

else:

665

result = {"error": f"Unknown event type: {event_type}"}

666

667

results.append(result)

668

669

except Exception as e:

670

results.append({"error": str(e)})

671

672

return {

673

"statusCode": 200,

674

"processed": len(results),

675

"results": results

676

}

677

678

def handle_eventbridge_event(event: dict, context: LambdaContext) -> dict:

679

"""Handle EventBridge events"""

680

681

detail_type = event.get("detail-type")

682

683

try:

684

if detail_type == "Order Created":

685

parsed_event = parse(event, OrderCreated, envelope=EventBridgeEnvelope)

686

result = process_order_created(parsed_event)

687

elif detail_type == "Payment Processed":

688

parsed_event = parse(event, PaymentProcessed, envelope=EventBridgeEnvelope)

689

result = process_payment(parsed_event)

690

else:

691

result = {"error": f"Unknown detail type: {detail_type}"}

692

693

return {

694

"statusCode": 200,

695

"result": result

696

}

697

698

except Exception as e:

699

return {

700

"statusCode": 500,

701

"error": str(e)

702

}

703

704

def handle_kinesis_events(event: dict, context: LambdaContext) -> dict:

705

"""Handle Kinesis stream events"""

706

707

try:

708

# Parse all Kinesis records as inventory updates

709

inventory_updates = parse(event, InventoryUpdate, envelope=KinesisDataStreamEnvelope)

710

711

results = []

712

for update in inventory_updates:

713

result = process_inventory_update(update)

714

results.append(result)

715

716

return {

717

"statusCode": 200,

718

"processed": len(results),

719

"results": results

720

}

721

722

except Exception as e:

723

return {

724

"statusCode": 500,

725

"error": str(e)

726

}

727

728

def process_order_created(order: OrderCreated) -> dict:

729

"""Process order creation event"""

730

return {

731

"type": "order_processed",

732

"order_id": order.order_id,

733

"customer_id": order.customer_id,

734

"amount": order.total_amount,

735

"currency": order.currency

736

}

737

738

def process_payment(payment: PaymentProcessed) -> dict:

739

"""Process payment event"""

740

return {

741

"type": "payment_processed",

742

"payment_id": payment.payment_id,

743

"order_id": payment.order_id,

744

"amount": payment.amount,

745

"status": payment.status

746

}

747

748

def process_inventory_update(update: InventoryUpdate) -> dict:

749

"""Process inventory update event"""

750

return {

751

"type": "inventory_updated",

752

"product_id": update.product_id,

753

"quantity_change": update.quantity_change,

754

"warehouse_id": update.warehouse_id

755

}

756

```

757

758

### Advanced Parser Features

759

760

```python

761

from aws_lambda_powertools.utilities.parser import (

762

event_parser,

763

BaseModel,

764

Field,

765

field_validator,

766

model_validator,

767

BaseEnvelope

768

)

769

from aws_lambda_powertools.utilities.typing import LambdaContext

770

from typing import List, Optional, Union, Any

771

from datetime import datetime, timezone

772

from decimal import Decimal

773

import json

774

775

class CustomEnvelope(BaseEnvelope):

776

"""Custom envelope for proprietary event format"""

777

778

def parse(self, data: Dict[str, Any], model: BaseModel) -> Any:

779

# Extract nested payload from custom format

780

payload = data.get("payload", {})

781

metadata = data.get("metadata", {})

782

783

# Merge metadata into payload for parsing

784

if metadata:

785

payload["_metadata"] = metadata

786

787

return model.parse_obj(payload)

788

789

class Money(BaseModel):

790

"""Money value with currency"""

791

amount: Decimal = Field(description="Monetary amount")

792

currency: str = Field(default="USD", description="Currency code")

793

794

@field_validator('currency')

795

@classmethod

796

def validate_currency(cls, v: str) -> str:

797

"""Validate currency code"""

798

valid_currencies = ["USD", "EUR", "GBP", "JPY", "CAD", "AUD"]

799

if v.upper() not in valid_currencies:

800

raise ValueError(f"Invalid currency: {v}")

801

return v.upper()

802

803

class Address(BaseModel):

804

"""Address information"""

805

street: str = Field(description="Street address")

806

city: str = Field(description="City")

807

state: Optional[str] = Field(default=None, description="State/Province")

808

postal_code: str = Field(description="Postal/ZIP code")

809

country: str = Field(description="Country code")

810

811

class ComplexOrder(BaseModel):

812

"""Complex order with nested models and validation"""

813

order_id: str = Field(description="Order identifier")

814

customer_id: str = Field(description="Customer identifier")

815

order_date: datetime = Field(description="Order date")

816

817

# Nested models

818

billing_address: Address = Field(description="Billing address")

819

shipping_address: Optional[Address] = Field(default=None, description="Shipping address")

820

821

# Complex fields

822

total: Money = Field(description="Order total")

823

tax: Money = Field(description="Tax amount")

824

shipping_cost: Money = Field(description="Shipping cost")

825

826

# Optional metadata

827

_metadata: Optional[dict] = Field(default=None, alias="metadata", description="Event metadata")

828

829

@field_validator('order_date')

830

@classmethod

831

def validate_order_date(cls, v: datetime) -> datetime:

832

"""Ensure order date is timezone-aware"""

833

if v.tzinfo is None:

834

v = v.replace(tzinfo=timezone.utc)

835

return v

836

837

@model_validator(mode='after')

838

def validate_order_totals(self) -> 'ComplexOrder':

839

"""Validate order financial calculations"""

840

# Ensure all amounts use same currency

841

currencies = {self.total.currency, self.tax.currency, self.shipping_cost.currency}

842

if len(currencies) > 1:

843

raise ValueError("All monetary amounts must use the same currency")

844

845

# Validate total calculation (simplified)

846

calculated_total = self.tax.amount + self.shipping_cost.amount

847

if abs(self.total.amount - calculated_total) > Decimal('0.01'):

848

# In a real scenario, you'd validate against line items too

849

pass # Skip validation for this example

850

851

return self

852

853

@model_validator(mode='after')

854

def set_default_shipping_address(self) -> 'ComplexOrder':

855

"""Set shipping address to billing if not provided"""

856

if self.shipping_address is None:

857

self.shipping_address = self.billing_address

858

859

return self

860

861

@event_parser(model=ComplexOrder, envelope=CustomEnvelope)

862

def lambda_handler(event: ComplexOrder, context: LambdaContext) -> dict:

863

"""Process complex order with full validation"""

864

865

print(f"Processing order {event.order_id} for customer {event.customer_id}")

866

867

# Access nested model data

868

billing_city = event.billing_address.city

869

shipping_city = event.shipping_address.city

870

871

# Work with validated monetary amounts

872

total_amount = event.total.amount

873

currency = event.total.currency

874

875

# Access metadata if provided

876

source_system = None

877

if event._metadata:

878

source_system = event._metadata.get("source_system")

879

880

# Process order

881

result = {

882

"order_id": event.order_id,

883

"customer_id": event.customer_id,

884

"total_amount": str(total_amount),

885

"currency": currency,

886

"billing_city": billing_city,

887

"shipping_city": shipping_city,

888

"source_system": source_system,

889

"processed_at": datetime.utcnow().isoformat()

890

}

891

892

# Perform business logic

893

process_complex_order(event)

894

895

return {

896

"statusCode": 200,

897

"body": result

898

}

899

900

def process_complex_order(order: ComplexOrder):

901

"""Process order with type-safe access to all fields"""

902

903

# Calculate shipping distance (mock)

904

if order.billing_address.city != order.shipping_address.city:

905

print(f"Cross-city shipping: {order.billing_address.city} -> {order.shipping_address.city}")

906

907

# Handle international orders

908

if order.billing_address.country != order.shipping_address.country:

909

print(f"International order: {order.billing_address.country} -> {order.shipping_address.country}")

910

# Apply international shipping rules

911

912

# Process payment

913

print(f"Processing payment: {order.total.amount} {order.total.currency}")

914

915

# Log order details

916

print(f"Order details: {order.dict()}")

917

```

918

919

## Types

920

921

```python { .api }

922

from typing import Dict, Any, List, Union, Type, Optional, Callable

923

from pydantic import BaseModel as PydanticBaseModel, ValidationError as PydanticValidationError

924

925

# Parser function signatures

926

EventParserDecorator = Callable[[Callable], Callable]

927

ParseFunction = Callable[[Dict[str, Any], Type[BaseModel], Optional[BaseEnvelope]], Any]

928

929

# Model types

930

ModelClass = Type[BaseModel]

931

ModelInstance = BaseModel

932

933

# Validation types

934

ValidationError = PydanticValidationError

935

ValidationErrors = List[Dict[str, Any]]

936

937

# Envelope types

938

EnvelopeClass = Type[BaseEnvelope]

939

EnvelopeInstance = BaseEnvelope

940

941

# Event data types

942

EventData = Dict[str, Any]

943

ParsedData = Union[Any, List[Any]]

944

945

# Field validation decorator

946

FieldValidator = Callable[[Any], Any]

947

ModelValidator = Callable[[BaseModel], BaseModel]

948

949

# JSON types

950

JsonValue = Union[str, int, float, bool, None, Dict[str, Any], List[Any]]

951

Json = JsonValue

952

953

# Literal type for type hints

954

from typing_extensions import Literal

955

```