or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdcore-observability.mddata-classes.mdevent-handlers.mdfeature-flags.mdindex.mdparameters.mdparser.mdutilities.md

utilities.mddocs/

0

# Utilities

1

2

Additional utilities including data masking for sensitive information, streaming for large S3 objects, serialization helpers, validation, JMESPath operations, Kafka consumer helpers, and middleware factory for Lambda decorators.

3

4

## Capabilities

5

6

### Data Masking

7

8

Utilities for masking sensitive data in logs, responses, and storage with pluggable providers.

9

10

```python { .api }

11

class DataMasking:

12

def __init__(self, provider: BaseProvider):

13

"""

14

Initialize data masking utility.

15

16

Parameters:

17

- provider: Data masking provider (e.g., KMS-based masking)

18

"""

19

20

def erase(

21

self,

22

data: Any,

23

fields: List[str] = None,

24

) -> Any:

25

"""

26

Erase sensitive fields from data structure.

27

28

Parameters:

29

- data: Data structure (dict, list, or primitive)

30

- fields: List of field paths to erase (uses JMESPath syntax)

31

32

Returns:

33

Data with specified fields erased/masked

34

"""

35

36

def encrypt(

37

self,

38

data: Any,

39

fields: List[str] = None,

40

**provider_options,

41

) -> Any:

42

"""

43

Encrypt sensitive fields in data structure.

44

45

Parameters:

46

- data: Data structure to process

47

- fields: Field paths to encrypt

48

- **provider_options: Provider-specific options

49

50

Returns:

51

Data with specified fields encrypted

52

"""

53

54

def decrypt(

55

self,

56

data: Any,

57

fields: List[str] = None,

58

**provider_options,

59

) -> Any:

60

"""

61

Decrypt encrypted fields in data structure.

62

63

Parameters:

64

- data: Data structure with encrypted fields

65

- fields: Field paths to decrypt

66

- **provider_options: Provider-specific options

67

68

Returns:

69

Data with specified fields decrypted

70

"""

71

72

class BaseProvider:

73

"""Base data masking provider interface"""

74

75

def encrypt(self, data: str, **kwargs) -> str:

76

"""Encrypt string data"""

77

raise NotImplementedError

78

79

def decrypt(self, data: str, **kwargs) -> str:

80

"""Decrypt string data"""

81

raise NotImplementedError

82

83

def erase(self, data: str, **kwargs) -> str:

84

"""Erase/mask string data"""

85

raise NotImplementedError

86

```

87

88

### Streaming

89

90

Utilities for streaming large objects from S3 with transformation support.

91

92

```python { .api }

93

class S3Object:

94

def __init__(

95

self,

96

bucket: str,

97

key: str,

98

version_id: str = None,

99

boto3_session: boto3.Session = None,

100

**kwargs,

101

):

102

"""

103

Initialize S3 object for streaming operations.

104

105

Parameters:

106

- bucket: S3 bucket name

107

- key: S3 object key

108

- version_id: Specific object version ID

109

- boto3_session: Boto3 session for authentication

110

- **kwargs: Additional S3 client parameters

111

"""

112

113

def transform(self, transform: BaseTransform) -> "S3Object":

114

"""

115

Apply transformation to object during streaming.

116

117

Parameters:

118

- transform: Transformation to apply

119

120

Returns:

121

New S3Object instance with transformation applied

122

"""

123

124

def iter_lines(

125

self,

126

chunk_size: int = 1024,

127

**kwargs,

128

) -> Iterator[str]:

129

"""

130

Iterate over object lines.

131

132

Parameters:

133

- chunk_size: Size of chunks to read

134

- **kwargs: Additional parameters

135

136

Returns:

137

Iterator yielding lines from the object

138

"""

139

140

def iter_chunks(

141

self,

142

chunk_size: int = 1024,

143

**kwargs,

144

) -> Iterator[bytes]:

145

"""

146

Iterate over object chunks.

147

148

Parameters:

149

- chunk_size: Size of chunks to read

150

- **kwargs: Additional parameters

151

152

Returns:

153

Iterator yielding byte chunks from the object

154

"""

155

156

def read(self, size: int = -1) -> bytes:

157

"""

158

Read object data.

159

160

Parameters:

161

- size: Number of bytes to read (-1 for all)

162

163

Returns:

164

Object data as bytes

165

"""

166

167

def readline(self, size: int = -1) -> str:

168

"""

169

Read single line from object.

170

171

Parameters:

172

- size: Maximum line length

173

174

Returns:

175

Single line as string

176

"""

177

178

def write_to(

179

self,

180

destination_bucket: str,

181

destination_key: str,

182

**kwargs,

183

) -> Dict[str, Any]:

184

"""

185

Write transformed object to destination S3 location.

186

187

Parameters:

188

- destination_bucket: Destination S3 bucket

189

- destination_key: Destination S3 key

190

- **kwargs: Additional S3 put parameters

191

192

Returns:

193

S3 put operation response

194

"""

195

196

class BaseTransform:

197

"""Base transformation interface for streaming objects"""

198

199

def transform(self, data: bytes) -> bytes:

200

"""

201

Transform byte data.

202

203

Parameters:

204

- data: Input data bytes

205

206

Returns:

207

Transformed data bytes

208

"""

209

210

class GzipTransform(BaseTransform):

211

def __init__(self, compress: bool = True):

212

"""

213

Gzip compression/decompression transform.

214

215

Parameters:

216

- compress: True to compress, False to decompress

217

"""

218

219

def transform(self, data: bytes) -> bytes:

220

"""Apply gzip compression or decompression"""

221

222

class ZipTransform(BaseTransform):

223

def __init__(

224

self,

225

compress: bool = True,

226

compression_level: int = 6,

227

):

228

"""

229

ZIP compression/decompression transform.

230

231

Parameters:

232

- compress: True to compress, False to decompress

233

- compression_level: Compression level (0-9)

234

"""

235

236

def transform(self, data: bytes) -> bytes:

237

"""Apply ZIP compression or decompression"""

238

239

class CsvTransform(BaseTransform):

240

def __init__(

241

self,

242

delimiter: str = ",",

243

quotechar: str = '"',

244

headers: List[str] = None,

245

**kwargs,

246

):

247

"""

248

CSV parsing and generation transform.

249

250

Parameters:

251

- delimiter: Field delimiter

252

- quotechar: Quote character

253

- headers: Column headers for output

254

- **kwargs: Additional CSV parameters

255

"""

256

257

def transform(self, data: bytes) -> bytes:

258

"""Transform between CSV and JSON formats"""

259

```

260

261

### Serialization

262

263

Utilities for common serialization tasks including Base64 encoding/decoding.

264

265

```python { .api }

266

def base64_encode(data: Union[str, bytes], url_safe: bool = False) -> str:

267

"""

268

Encode data as Base64 string.

269

270

Parameters:

271

- data: Data to encode (string or bytes)

272

- url_safe: Whether to use URL-safe Base64 encoding

273

274

Returns:

275

Base64 encoded string

276

"""

277

278

def base64_decode(

279

data: str,

280

url_safe: bool = False,

281

validate: bool = True,

282

) -> bytes:

283

"""

284

Decode Base64 string to bytes.

285

286

Parameters:

287

- data: Base64 encoded string

288

- url_safe: Whether string uses URL-safe Base64 encoding

289

- validate: Whether to validate Base64 format

290

291

Returns:

292

Decoded bytes

293

294

Raises:

295

ValueError: If Base64 string is invalid and validate=True

296

"""

297

298

def base64_from_str(

299

data: str,

300

encoding: str = "utf-8",

301

url_safe: bool = False,

302

) -> str:

303

"""

304

Encode string as Base64.

305

306

Parameters:

307

- data: String to encode

308

- encoding: String encoding to use

309

- url_safe: Whether to use URL-safe Base64

310

311

Returns:

312

Base64 encoded string

313

"""

314

315

def base64_from_json(

316

data: Any,

317

ensure_ascii: bool = True,

318

url_safe: bool = False,

319

) -> str:

320

"""

321

Encode JSON data as Base64 string.

322

323

Parameters:

324

- data: Data to serialize as JSON then encode

325

- ensure_ascii: Whether to ensure ASCII-only JSON

326

- url_safe: Whether to use URL-safe Base64

327

328

Returns:

329

Base64 encoded JSON string

330

"""

331

```

332

333

### Validation

334

335

Schema validation utilities for JSON data validation.

336

337

```python { .api }

338

def validate(

339

event: Dict[str, Any],

340

schema: Dict[str, Any],

341

envelope: str = None,

342

) -> Dict[str, Any]:

343

"""

344

Validate event data against JSON schema.

345

346

Parameters:

347

- event: Event data to validate

348

- schema: JSON schema for validation

349

- envelope: JMESPath expression to extract data from event

350

351

Returns:

352

Validated event data

353

354

Raises:

355

SchemaValidationError: If validation fails

356

InvalidSchemaFormatError: If schema format is invalid

357

InvalidEnvelopeExpressionError: If envelope expression is invalid

358

"""

359

360

def validator(

361

schema: Dict[str, Any],

362

envelope: str = None,

363

) -> Callable:

364

"""

365

Decorator for validating Lambda event data.

366

367

Parameters:

368

- schema: JSON schema for validation

369

- envelope: JMESPath expression for data extraction

370

371

Returns:

372

Decorator function that validates event before handler execution

373

"""

374

375

class InvalidSchemaFormatError(Exception):

376

"""Raised when JSON schema format is invalid"""

377

pass

378

379

class SchemaValidationError(Exception):

380

"""Raised when data validation against schema fails"""

381

pass

382

383

class InvalidEnvelopeExpressionError(Exception):

384

"""Raised when JMESPath envelope expression is invalid"""

385

pass

386

```

387

388

### JMESPath Utils

389

390

Utilities for JMESPath operations on JSON data with custom functions.

391

392

```python { .api }

393

def query(

394

data: Dict[str, Any],

395

expression: str,

396

options: Dict[str, Any] = None,

397

) -> Any:

398

"""

399

Execute JMESPath query on data.

400

401

Parameters:

402

- data: JSON data to query

403

- expression: JMESPath expression

404

- options: JMESPath options including custom functions

405

406

Returns:

407

Query result

408

"""

409

410

def extract_data_from_envelope(

411

data: Dict[str, Any],

412

envelope: str,

413

) -> Any:

414

"""

415

Extract data using JMESPath envelope expression.

416

417

Parameters:

418

- data: Source data

419

- envelope: JMESPath expression for data extraction

420

421

Returns:

422

Extracted data

423

424

Note: This function is deprecated, use query() instead

425

"""

426

427

class PowertoolsFunctions:

428

"""Built-in JMESPath functions for common operations"""

429

430

@staticmethod

431

def powertools_json(value: str) -> Any:

432

"""Parse JSON string"""

433

434

@staticmethod

435

def powertools_base64(value: str) -> str:

436

"""Decode Base64 string"""

437

438

@staticmethod

439

def powertools_base64_gzip(value: str) -> str:

440

"""Decode Base64 and decompress gzip"""

441

```

442

443

### Kafka Consumer

444

445

Utilities for processing Kafka messages with schema support and deserialization.

446

447

```python { .api }

448

def kafka_consumer(

449

record_handler: Callable[[Dict], Any],

450

deserializer: BaseDeserializer = None,

451

) -> Callable:

452

"""

453

Decorator for Kafka consumer Lambda functions.

454

455

Parameters:

456

- record_handler: Function to process individual Kafka records

457

- deserializer: Deserializer for Kafka message values

458

459

Returns:

460

Decorated Lambda function that processes Kafka events

461

"""

462

463

class ConsumerRecords:

464

"""Kafka consumer records container"""

465

466

def __init__(

467

self,

468

raw_event: Dict[str, Any],

469

deserializer: BaseDeserializer = None,

470

):

471

"""

472

Initialize consumer records.

473

474

Parameters:

475

- raw_event: Raw Kafka Lambda event

476

- deserializer: Message deserializer

477

"""

478

479

@property

480

def records(self) -> List[KafkaRecord]:

481

"""Get list of Kafka records"""

482

483

def __iter__(self) -> Iterator[KafkaRecord]:

484

"""Iterate over Kafka records"""

485

486

class SchemaConfig:

487

def __init__(

488

self,

489

schema_registry_url: str,

490

schema_name: str = None,

491

schema_version: int = None,

492

**kwargs,

493

):

494

"""

495

Schema configuration for Kafka message deserialization.

496

497

Parameters:

498

- schema_registry_url: Confluent Schema Registry URL

499

- schema_name: Schema name/subject

500

- schema_version: Specific schema version

501

- **kwargs: Additional schema registry client options

502

"""

503

504

class BaseDeserializer:

505

"""Base deserializer interface for Kafka messages"""

506

507

def deserialize(self, data: bytes, **kwargs) -> Any:

508

"""

509

Deserialize Kafka message data.

510

511

Parameters:

512

- data: Raw message bytes

513

- **kwargs: Additional deserialization options

514

515

Returns:

516

Deserialized message data

517

"""

518

```

519

520

### Lambda Context Type

521

522

Type definition for Lambda execution context.

523

524

```python { .api }

525

class LambdaContext:

526

"""AWS Lambda execution context"""

527

528

function_name: str

529

function_version: str

530

invoked_function_arn: str

531

memory_limit_in_mb: int

532

remaining_time_in_millis: int

533

request_id: str

534

log_group_name: str

535

log_stream_name: str

536

537

@property

538

def identity(self) -> Any:

539

"""Cognito identity information (mobile apps)"""

540

541

@property

542

def client_context(self) -> Any:

543

"""Client context information (mobile apps)"""

544

545

def get_remaining_time_in_millis(self) -> int:

546

"""Get remaining execution time in milliseconds"""

547

```

548

549

### Middleware Factory

550

551

Factory for creating Lambda handler middleware decorators.

552

553

```python { .api }

554

def lambda_handler_decorator(

555

trace_execution: bool = False,

556

clear_state: bool = False,

557

) -> Callable:

558

"""

559

Factory for creating Lambda handler decorators.

560

561

Parameters:

562

- trace_execution: Whether to trace decorator execution

563

- clear_state: Whether to clear state after execution

564

565

Returns:

566

Decorator factory function

567

"""

568

```

569

570

## Usage Examples

571

572

### Data Masking for Sensitive Information

573

574

```python

575

from aws_lambda_powertools.utilities.data_masking import DataMasking, BaseProvider

576

from aws_lambda_powertools.utilities.typing import LambdaContext

577

import json

578

579

class SimpleErasureProvider(BaseProvider):

580

"""Simple provider that erases sensitive data"""

581

582

def erase(self, data: str, **kwargs) -> str:

583

return "***MASKED***"

584

585

def encrypt(self, data: str, **kwargs) -> str:

586

# In real implementation, use proper encryption

587

return f"ENCRYPTED:{data[:3]}***"

588

589

def decrypt(self, data: str, **kwargs) -> str:

590

# In real implementation, use proper decryption

591

if data.startswith("ENCRYPTED:"):

592

return data.replace("ENCRYPTED:", "").replace("***", "")

593

return data

594

595

# Initialize data masking

596

masking = DataMasking(provider=SimpleErasureProvider())

597

598

def lambda_handler(event: dict, context: LambdaContext) -> dict:

599

# Sample user data with sensitive information

600

user_data = {

601

"user_id": "12345",

602

"name": "John Doe",

603

"email": "john@example.com",

604

"ssn": "123-45-6789",

605

"credit_card": "4111-1111-1111-1111",

606

"address": {

607

"street": "123 Main St",

608

"city": "Anytown",

609

"zip": "12345"

610

},

611

"preferences": {

612

"newsletter": True,

613

"phone": "555-123-4567"

614

}

615

}

616

617

# Erase sensitive fields for logging

618

safe_logging_data = masking.erase(

619

data=user_data,

620

fields=[

621

"ssn",

622

"credit_card",

623

"preferences.phone"

624

]

625

)

626

627

print(f"Processing user data: {json.dumps(safe_logging_data, indent=2)}")

628

629

# Encrypt sensitive data for storage

630

encrypted_data = masking.encrypt(

631

data=user_data,

632

fields=["ssn", "credit_card"]

633

)

634

635

# Store encrypted data

636

store_user_data(encrypted_data)

637

638

# Return response with masked sensitive data

639

response_data = masking.erase(

640

data=user_data,

641

fields=["ssn", "credit_card", "preferences.phone"]

642

)

643

644

return {

645

"statusCode": 200,

646

"body": json.dumps({

647

"message": "User data processed",

648

"user": response_data

649

})

650

}

651

652

def store_user_data(data: dict):

653

"""Store user data (mock implementation)"""

654

print("Storing encrypted user data to database")

655

```

656

657

### S3 Streaming with Transformations

658

659

```python

660

from aws_lambda_powertools.utilities.streaming import S3Object

661

from aws_lambda_powertools.utilities.streaming.transformations import (

662

GzipTransform,

663

CsvTransform

664

)

665

from aws_lambda_powertools.utilities.typing import LambdaContext

666

import json

667

668

def lambda_handler(event: dict, context: LambdaContext) -> dict:

669

# Get S3 event information

670

bucket = event["Records"][0]["s3"]["bucket"]["name"]

671

key = event["Records"][0]["s3"]["object"]["key"]

672

673

print(f"Processing S3 object: s3://{bucket}/{key}")

674

675

# Create S3 streaming object

676

s3_object = S3Object(bucket=bucket, key=key)

677

678

processed_records = 0

679

680

if key.endswith('.gz'):

681

# Handle compressed file

682

processed_records = process_compressed_file(s3_object, bucket)

683

elif key.endswith('.csv'):

684

# Handle CSV file

685

processed_records = process_csv_file(s3_object, bucket)

686

else:

687

# Handle regular text file

688

processed_records = process_text_file(s3_object, bucket)

689

690

return {

691

"statusCode": 200,

692

"body": json.dumps({

693

"processed_records": processed_records,

694

"source_bucket": bucket,

695

"source_key": key

696

})

697

}

698

699

def process_compressed_file(s3_object: S3Object, bucket: str) -> int:

700

"""Process gzip compressed file"""

701

702

# Add decompression transform

703

decompressed_object = s3_object.transform(GzipTransform(compress=False))

704

705

record_count = 0

706

707

# Process line by line without loading entire file into memory

708

for line in decompressed_object.iter_lines():

709

if line.strip(): # Skip empty lines

710

# Process individual line

711

process_log_line(line)

712

record_count += 1

713

714

print(f"Processed {record_count} log entries from compressed file")

715

return record_count

716

717

def process_csv_file(s3_object: S3Object, bucket: str) -> int:

718

"""Process CSV file and convert to JSON"""

719

720

# Transform CSV to JSON format

721

csv_transform = CsvTransform(

722

delimiter=",",

723

headers=["id", "name", "email", "created_at"]

724

)

725

726

json_object = s3_object.transform(csv_transform)

727

728

# Write transformed data to new S3 location

729

output_key = s3_object.key.replace('.csv', '.json')

730

731

json_object.write_to(

732

destination_bucket=bucket,

733

destination_key=f"processed/{output_key}",

734

ContentType="application/json"

735

)

736

737

# Count records by iterating through original CSV

738

record_count = sum(1 for line in s3_object.iter_lines()) - 1 # Subtract header

739

740

print(f"Converted {record_count} CSV records to JSON")

741

return record_count

742

743

def process_text_file(s3_object: S3Object, bucket: str) -> int:

744

"""Process regular text file"""

745

746

record_count = 0

747

processed_lines = []

748

749

# Process in chunks to handle large files

750

for chunk in s3_object.iter_chunks(chunk_size=8192):

751

# Process chunk data

752

chunk_str = chunk.decode('utf-8', errors='ignore')

753

lines = chunk_str.split('\n')

754

755

for line in lines:

756

if line.strip():

757

processed_line = process_text_line(line)

758

if processed_line:

759

processed_lines.append(processed_line)

760

record_count += 1

761

762

# Write processed data back to S3

763

if processed_lines:

764

output_data = '\n'.join(processed_lines)

765

766

# Use gzip compression for output

767

compressed_object = S3Object(

768

bucket=bucket,

769

key="temp/processing_output.txt"

770

).transform(GzipTransform(compress=True))

771

772

# This would require implementing write capability

773

# compressed_object.write(output_data.encode('utf-8'))

774

775

print(f"Processed {record_count} text lines")

776

return record_count

777

778

def process_log_line(line: str) -> None:

779

"""Process individual log line"""

780

try:

781

# Parse log line (e.g., JSON logs)

782

log_entry = json.loads(line)

783

784

# Extract relevant information

785

timestamp = log_entry.get("timestamp")

786

level = log_entry.get("level")

787

message = log_entry.get("message")

788

789

# Process based on log level

790

if level == "ERROR":

791

handle_error_log(log_entry)

792

elif level == "WARN":

793

handle_warning_log(log_entry)

794

795

except json.JSONDecodeError:

796

# Handle non-JSON log lines

797

print(f"Non-JSON log line: {line[:100]}...")

798

799

def process_text_line(line: str) -> str:

800

"""Process and transform text line"""

801

# Example: uppercase and add timestamp

802

import datetime

803

timestamp = datetime.datetime.utcnow().isoformat()

804

return f"[{timestamp}] {line.upper()}"

805

```

806

807

### Schema Validation

808

809

```python

810

from aws_lambda_powertools.utilities.validation import (

811

validate,

812

validator,

813

SchemaValidationError

814

)

815

from aws_lambda_powertools.utilities.typing import LambdaContext

816

import json

817

818

# JSON Schema for user registration

819

USER_REGISTRATION_SCHEMA = {

820

"type": "object",

821

"properties": {

822

"firstName": {

823

"type": "string",

824

"minLength": 1,

825

"maxLength": 50

826

},

827

"lastName": {

828

"type": "string",

829

"minLength": 1,

830

"maxLength": 50

831

},

832

"email": {

833

"type": "string",

834

"format": "email"

835

},

836

"age": {

837

"type": "integer",

838

"minimum": 13,

839

"maximum": 120

840

},

841

"preferences": {

842

"type": "object",

843

"properties": {

844

"newsletter": {"type": "boolean"},

845

"notifications": {"type": "boolean"}

846

},

847

"additionalProperties": False

848

}

849

},

850

"required": ["firstName", "lastName", "email", "age"],

851

"additionalProperties": False

852

}

853

854

# Order schema

855

ORDER_SCHEMA = {

856

"type": "object",

857

"properties": {

858

"orderId": {"type": "string"},

859

"customerId": {"type": "string"},

860

"items": {

861

"type": "array",

862

"items": {

863

"type": "object",

864

"properties": {

865

"productId": {"type": "string"},

866

"quantity": {"type": "integer", "minimum": 1},

867

"price": {"type": "number", "minimum": 0}

868

},

869

"required": ["productId", "quantity", "price"]

870

},

871

"minItems": 1

872

},

873

"totalAmount": {"type": "number", "minimum": 0}

874

},

875

"required": ["orderId", "customerId", "items", "totalAmount"]

876

}

877

878

@validator(schema=USER_REGISTRATION_SCHEMA, envelope="body")

879

def register_user_handler(event: dict, context: LambdaContext) -> dict:

880

"""Handler with automatic validation"""

881

882

# Event body is automatically validated against schema

883

user_data = event["body"]

884

885

print(f"Registering user: {user_data['firstName']} {user_data['lastName']}")

886

887

# Process validated user data

888

user_id = create_user_account(user_data)

889

890

return {

891

"statusCode": 201,

892

"body": json.dumps({

893

"userId": user_id,

894

"message": "User registered successfully"

895

})

896

}

897

898

def manual_validation_handler(event: dict, context: LambdaContext) -> dict:

899

"""Handler with manual validation"""

900

901

try:

902

# Manually validate event data

903

validated_data = validate(

904

event=event,

905

schema=ORDER_SCHEMA,

906

envelope="body" # Extract from event.body

907

)

908

909

order_data = validated_data["body"]

910

911

print(f"Processing order: {order_data['orderId']}")

912

913

# Validate business rules after schema validation

914

validate_business_rules(order_data)

915

916

# Process order

917

result = process_order(order_data)

918

919

return {

920

"statusCode": 200,

921

"body": json.dumps(result)

922

}

923

924

except SchemaValidationError as e:

925

print(f"Schema validation failed: {e}")

926

return {

927

"statusCode": 400,

928

"body": json.dumps({

929

"error": "Invalid request data",

930

"details": str(e)

931

})

932

}

933

except BusinessRuleError as e:

934

print(f"Business rule validation failed: {e}")

935

return {

936

"statusCode": 422,

937

"body": json.dumps({

938

"error": "Business rule violation",

939

"details": str(e)

940

})

941

}

942

943

def validate_business_rules(order_data: dict):

944

"""Additional business rule validation"""

945

946

# Validate total amount matches item prices

947

calculated_total = sum(

948

item["quantity"] * item["price"]

949

for item in order_data["items"]

950

)

951

952

if abs(order_data["totalAmount"] - calculated_total) > 0.01:

953

raise BusinessRuleError("Total amount doesn't match item prices")

954

955

# Validate order limits

956

if order_data["totalAmount"] > 10000:

957

raise BusinessRuleError("Order exceeds maximum amount limit")

958

959

# Validate item availability

960

for item in order_data["items"]:

961

if not is_product_available(item["productId"], item["quantity"]):

962

raise BusinessRuleError(f"Product {item['productId']} not available")

963

964

class BusinessRuleError(Exception):

965

"""Custom exception for business rule violations"""

966

pass

967

968

def create_user_account(user_data: dict) -> str:

969

"""Create user account with validated data"""

970

import uuid

971

972

user_id = str(uuid.uuid4())

973

974

# Save to database

975

print(f"Creating user account: {user_id}")

976

977

return user_id

978

979

def process_order(order_data: dict) -> dict:

980

"""Process validated order"""

981

982

# Reserve inventory

983

for item in order_data["items"]:

984

reserve_inventory(item["productId"], item["quantity"])

985

986

# Process payment

987

payment_id = process_payment(order_data["totalAmount"])

988

989

return {

990

"orderId": order_data["orderId"],

991

"paymentId": payment_id,

992

"status": "confirmed"

993

}

994

```

995

996

### JMESPath Data Extraction

997

998

```python

999

from aws_lambda_powertools.utilities.jmespath_utils import query, PowertoolsFunctions

1000

from aws_lambda_powertools.utilities.typing import LambdaContext

1001

import json

1002

1003

def lambda_handler(event: dict, context: LambdaContext) -> dict:

1004

"""Demonstrate JMESPath operations"""

1005

1006

# Complex nested event data

1007

event_data = {

1008

"Records": [

1009

{

1010

"eventSource": "aws:s3",

1011

"s3": {

1012

"bucket": {"name": "my-bucket"},

1013

"object": {"key": "data/file1.json", "size": 1024}

1014

},

1015

"eventName": "ObjectCreated:Put"

1016

},

1017

{

1018

"eventSource": "aws:s3",

1019

"s3": {

1020

"bucket": {"name": "my-bucket"},

1021

"object": {"key": "data/file2.json", "size": 2048}

1022

},

1023

"eventName": "ObjectCreated:Put"

1024

}

1025

],

1026

"responsePayload": json.dumps({

1027

"users": [

1028

{"id": 1, "name": "Alice", "active": True},

1029

{"id": 2, "name": "Bob", "active": False}

1030

]

1031

})

1032

}

1033

1034

# Extract S3 bucket names

1035

bucket_names = query(

1036

data=event_data,

1037

expression="Records[*].s3.bucket.name"

1038

)

1039

print(f"Bucket names: {bucket_names}")

1040

1041

# Extract object keys for created objects only

1042

created_objects = query(

1043

data=event_data,

1044

expression="Records[?eventName == 'ObjectCreated:Put'].s3.object.key"

1045

)

1046

print(f"Created objects: {created_objects}")

1047

1048

# Calculate total size of all objects

1049

total_size = query(

1050

data=event_data,

1051

expression="sum(Records[*].s3.object.size)"

1052

)

1053

print(f"Total size: {total_size} bytes")

1054

1055

# Extract and parse JSON payload using custom functions

1056

options = {"custom_functions": PowertoolsFunctions()}

1057

1058

users_data = query(

1059

data=event_data,

1060

expression="powertools_json(responsePayload).users",

1061

options=options

1062

)

1063

print(f"Users data: {users_data}")

1064

1065

# Get active users only

1066

active_users = query(

1067

data=event_data,

1068

expression="powertools_json(responsePayload).users[?active == `true`].name",

1069

options=options

1070

)

1071

print(f"Active users: {active_users}")

1072

1073

# Complex filtering and transformation

1074

processed_records = query(

1075

data=event_data,

1076

expression="""Records[?eventSource == 'aws:s3'] | [].{

1077

bucket: s3.bucket.name,

1078

key: s3.object.key,

1079

size_mb: s3.object.size / `1024` / `1024`,

1080

is_large: s3.object.size > `1500`

1081

}"""

1082

)

1083

print(f"Processed records: {json.dumps(processed_records, indent=2)}")

1084

1085

return {

1086

"statusCode": 200,

1087

"body": json.dumps({

1088

"bucket_names": bucket_names,

1089

"created_objects": created_objects,

1090

"total_size": total_size,

1091

"active_users": active_users,

1092

"processed_records": processed_records

1093

})

1094

}

1095

```

1096

1097

### Serialization Utilities

1098

1099

```python

1100

from aws_lambda_powertools.utilities.serialization import (

1101

base64_encode,

1102

base64_decode,

1103

base64_from_str,

1104

base64_from_json

1105

)

1106

from aws_lambda_powertools.utilities.typing import LambdaContext

1107

import json

1108

1109

def lambda_handler(event: dict, context: LambdaContext) -> dict:

1110

"""Demonstrate serialization utilities"""

1111

1112

# Sample data to work with

1113

sample_data = {

1114

"user_id": "12345",

1115

"action": "purchase",

1116

"metadata": {

1117

"product_id": "prod-789",

1118

"amount": 99.99,

1119

"currency": "USD"

1120

}

1121

}

1122

1123

# Encode JSON as Base64

1124

encoded_json = base64_from_json(sample_data)

1125

print(f"Encoded JSON: {encoded_json}")

1126

1127

# Encode string as Base64

1128

message = "Hello, World!"

1129

encoded_string = base64_from_str(message)

1130

print(f"Encoded string: {encoded_string}")

1131

1132

# Encode bytes as Base64

1133

binary_data = b"Binary data content"

1134

encoded_bytes = base64_encode(binary_data)

1135

print(f"Encoded bytes: {encoded_bytes}")

1136

1137

# URL-safe Base64 encoding

1138

url_safe_encoded = base64_encode(binary_data, url_safe=True)

1139

print(f"URL-safe encoded: {url_safe_encoded}")

1140

1141

# Decode Base64 back to original data

1142

decoded_json_bytes = base64_decode(encoded_json)

1143

decoded_json = json.loads(decoded_json_bytes.decode('utf-8'))

1144

print(f"Decoded JSON: {decoded_json}")

1145

1146

decoded_string_bytes = base64_decode(encoded_string)

1147

decoded_string = decoded_string_bytes.decode('utf-8')

1148

print(f"Decoded string: {decoded_string}")

1149

1150

decoded_bytes = base64_decode(encoded_bytes)

1151

print(f"Decoded bytes: {decoded_bytes}")

1152

1153

# Handle API Gateway events with Base64 encoded bodies

1154

api_event_body = event.get("body", "")

1155

is_base64_encoded = event.get("isBase64Encoded", False)

1156

1157

if is_base64_encoded and api_event_body:

1158

try:

1159

# Decode Base64 body

1160

decoded_body_bytes = base64_decode(api_event_body, validate=True)

1161

decoded_body = decoded_body_bytes.decode('utf-8')

1162

1163

# Parse as JSON if possible

1164

try:

1165

body_data = json.loads(decoded_body)

1166

print(f"Decoded API body: {body_data}")

1167

except json.JSONDecodeError:

1168

print(f"Decoded API body (text): {decoded_body}")

1169

1170

except ValueError as e:

1171

print(f"Failed to decode Base64 body: {e}")

1172

return {

1173

"statusCode": 400,

1174

"body": json.dumps({"error": "Invalid Base64 encoding"})

1175

}

1176

1177

# Prepare response with Base64 encoded data

1178

response_data = {

1179

"original_data": sample_data,

1180

"encoded_formats": {

1181

"json_base64": encoded_json,

1182

"string_base64": encoded_string,

1183

"bytes_base64": encoded_bytes,

1184

"url_safe_base64": url_safe_encoded

1185

},

1186

"decoded_verification": {

1187

"json_matches": decoded_json == sample_data,

1188

"string_matches": decoded_string == message,

1189

"bytes_matches": decoded_bytes == binary_data

1190

}

1191

}

1192

1193

return {

1194

"statusCode": 200,

1195

"body": json.dumps(response_data, indent=2)

1196

}

1197

```

1198

1199

## Types

1200

1201

```python { .api }

1202

from typing import Dict, Any, List, Union, Optional, Iterator, Callable, Type

1203

import boto3

1204

1205

# Data masking types

1206

class BaseProvider:

1207

"""Base provider interface for data masking operations"""

1208

pass

1209

1210

MaskingProvider = BaseProvider

1211

FieldPath = str # JMESPath-style field path

1212

EncryptionOptions = Dict[str, Any]

1213

1214

# Streaming types

1215

class S3Object:

1216

"""S3 object streaming interface"""

1217

pass

1218

1219

class BaseTransform:

1220

"""Base transformation interface"""

1221

pass

1222

1223

StreamChunk = bytes

1224

StreamLine = str

1225

ChunkIterator = Iterator[bytes]

1226

LineIterator = Iterator[str]

1227

1228

# Transformation options

1229

CompressionLevel = int # 0-9 for compression algorithms

1230

CsvOptions = Dict[str, Any]

1231

TransformOptions = Dict[str, Any]

1232

1233

# Serialization types

1234

EncodableData = Union[str, bytes]

1235

JsonData = Union[Dict[str, Any], List[Any], str, int, float, bool, None]

1236

Base64String = str

1237

EncodingType = str # e.g., "utf-8", "ascii"

1238

1239

# Validation types

1240

JsonSchema = Dict[str, Any]

1241

ValidationResult = Dict[str, Any]

1242

JMESPathExpression = str

1243

1244

class ValidationError(Exception):

1245

"""Base validation error"""

1246

pass

1247

1248

class SchemaValidationError(ValidationError):

1249

"""Schema validation specific error"""

1250

pass

1251

1252

class InvalidSchemaFormatError(ValidationError):

1253

"""Invalid schema format error"""

1254

pass

1255

1256

class InvalidEnvelopeExpressionError(ValidationError):

1257

"""Invalid JMESPath envelope error"""

1258

pass

1259

1260

# JMESPath types

1261

QueryExpression = str

1262

QueryResult = Any

1263

QueryOptions = Dict[str, Any]

1264

CustomFunctions = Any # JMESPath custom function registry

1265

1266

# Kafka types

1267

class ConsumerRecords:

1268

"""Kafka consumer records container"""

1269

pass

1270

1271

class BaseDeserializer:

1272

"""Kafka message deserializer interface"""

1273

pass

1274

1275

class SchemaConfig:

1276

"""Kafka schema configuration"""

1277

pass

1278

1279

KafkaRecord = Dict[str, Any]

1280

DeserializedMessage = Any

1281

SchemaRegistryUrl = str

1282

1283

# Lambda context type

1284

from aws_lambda_powertools.utilities.typing import LambdaContext

1285

1286

# Middleware types

1287

MiddlewareDecorator = Callable[[Callable], Callable]

1288

HandlerFunction = Callable[[Dict[str, Any], LambdaContext], Dict[str, Any]]

1289

1290

# Boto3 session type

1291

Boto3Session = boto3.Session

1292

Boto3Config = Dict[str, Any]

1293

```