0
# Utilities
1
2
Additional utilities including data masking for sensitive information, streaming for large S3 objects, serialization helpers, validation, JMESPath operations, Kafka consumer helpers, and middleware factory for Lambda decorators.
3
4
## Capabilities
5
6
### Data Masking
7
8
Utilities for masking sensitive data in logs, responses, and storage with pluggable providers.
9
10
```python { .api }
11
class DataMasking:
12
def __init__(self, provider: BaseProvider):
13
"""
14
Initialize data masking utility.
15
16
Parameters:
17
- provider: Data masking provider (e.g., KMS-based masking)
18
"""
19
20
def erase(
21
self,
22
data: Any,
23
fields: List[str] = None,
24
) -> Any:
25
"""
26
Erase sensitive fields from data structure.
27
28
Parameters:
29
- data: Data structure (dict, list, or primitive)
30
- fields: List of field paths to erase (uses JMESPath syntax)
31
32
Returns:
33
Data with specified fields erased/masked
34
"""
35
36
def encrypt(
37
self,
38
data: Any,
39
fields: List[str] = None,
40
**provider_options,
41
) -> Any:
42
"""
43
Encrypt sensitive fields in data structure.
44
45
Parameters:
46
- data: Data structure to process
47
- fields: Field paths to encrypt
48
- **provider_options: Provider-specific options
49
50
Returns:
51
Data with specified fields encrypted
52
"""
53
54
def decrypt(
55
self,
56
data: Any,
57
fields: List[str] = None,
58
**provider_options,
59
) -> Any:
60
"""
61
Decrypt encrypted fields in data structure.
62
63
Parameters:
64
- data: Data structure with encrypted fields
65
- fields: Field paths to decrypt
66
- **provider_options: Provider-specific options
67
68
Returns:
69
Data with specified fields decrypted
70
"""
71
72
class BaseProvider:
73
"""Base data masking provider interface"""
74
75
def encrypt(self, data: str, **kwargs) -> str:
76
"""Encrypt string data"""
77
raise NotImplementedError
78
79
def decrypt(self, data: str, **kwargs) -> str:
80
"""Decrypt string data"""
81
raise NotImplementedError
82
83
def erase(self, data: str, **kwargs) -> str:
84
"""Erase/mask string data"""
85
raise NotImplementedError
86
```
87
88
### Streaming
89
90
Utilities for streaming large objects from S3 with transformation support.
91
92
```python { .api }
93
class S3Object:
94
def __init__(
95
self,
96
bucket: str,
97
key: str,
98
version_id: str = None,
99
boto3_session: boto3.Session = None,
100
**kwargs,
101
):
102
"""
103
Initialize S3 object for streaming operations.
104
105
Parameters:
106
- bucket: S3 bucket name
107
- key: S3 object key
108
- version_id: Specific object version ID
109
- boto3_session: Boto3 session for authentication
110
- **kwargs: Additional S3 client parameters
111
"""
112
113
def transform(self, transform: BaseTransform) -> "S3Object":
114
"""
115
Apply transformation to object during streaming.
116
117
Parameters:
118
- transform: Transformation to apply
119
120
Returns:
121
New S3Object instance with transformation applied
122
"""
123
124
def iter_lines(
125
self,
126
chunk_size: int = 1024,
127
**kwargs,
128
) -> Iterator[str]:
129
"""
130
Iterate over object lines.
131
132
Parameters:
133
- chunk_size: Size of chunks to read
134
- **kwargs: Additional parameters
135
136
Returns:
137
Iterator yielding lines from the object
138
"""
139
140
def iter_chunks(
141
self,
142
chunk_size: int = 1024,
143
**kwargs,
144
) -> Iterator[bytes]:
145
"""
146
Iterate over object chunks.
147
148
Parameters:
149
- chunk_size: Size of chunks to read
150
- **kwargs: Additional parameters
151
152
Returns:
153
Iterator yielding byte chunks from the object
154
"""
155
156
def read(self, size: int = -1) -> bytes:
157
"""
158
Read object data.
159
160
Parameters:
161
- size: Number of bytes to read (-1 for all)
162
163
Returns:
164
Object data as bytes
165
"""
166
167
def readline(self, size: int = -1) -> str:
168
"""
169
Read single line from object.
170
171
Parameters:
172
- size: Maximum line length
173
174
Returns:
175
Single line as string
176
"""
177
178
def write_to(
179
self,
180
destination_bucket: str,
181
destination_key: str,
182
**kwargs,
183
) -> Dict[str, Any]:
184
"""
185
Write transformed object to destination S3 location.
186
187
Parameters:
188
- destination_bucket: Destination S3 bucket
189
- destination_key: Destination S3 key
190
- **kwargs: Additional S3 put parameters
191
192
Returns:
193
S3 put operation response
194
"""
195
196
class BaseTransform:
197
"""Base transformation interface for streaming objects"""
198
199
def transform(self, data: bytes) -> bytes:
200
"""
201
Transform byte data.
202
203
Parameters:
204
- data: Input data bytes
205
206
Returns:
207
Transformed data bytes
208
"""
209
210
class GzipTransform(BaseTransform):
211
def __init__(self, compress: bool = True):
212
"""
213
Gzip compression/decompression transform.
214
215
Parameters:
216
- compress: True to compress, False to decompress
217
"""
218
219
def transform(self, data: bytes) -> bytes:
220
"""Apply gzip compression or decompression"""
221
222
class ZipTransform(BaseTransform):
223
def __init__(
224
self,
225
compress: bool = True,
226
compression_level: int = 6,
227
):
228
"""
229
ZIP compression/decompression transform.
230
231
Parameters:
232
- compress: True to compress, False to decompress
233
- compression_level: Compression level (0-9)
234
"""
235
236
def transform(self, data: bytes) -> bytes:
237
"""Apply ZIP compression or decompression"""
238
239
class CsvTransform(BaseTransform):
240
def __init__(
241
self,
242
delimiter: str = ",",
243
quotechar: str = '"',
244
headers: List[str] = None,
245
**kwargs,
246
):
247
"""
248
CSV parsing and generation transform.
249
250
Parameters:
251
- delimiter: Field delimiter
252
- quotechar: Quote character
253
- headers: Column headers for output
254
- **kwargs: Additional CSV parameters
255
"""
256
257
def transform(self, data: bytes) -> bytes:
258
"""Transform between CSV and JSON formats"""
259
```
260
261
### Serialization
262
263
Utilities for common serialization tasks including Base64 encoding/decoding.
264
265
```python { .api }
266
def base64_encode(data: Union[str, bytes], url_safe: bool = False) -> str:
267
"""
268
Encode data as Base64 string.
269
270
Parameters:
271
- data: Data to encode (string or bytes)
272
- url_safe: Whether to use URL-safe Base64 encoding
273
274
Returns:
275
Base64 encoded string
276
"""
277
278
def base64_decode(
279
data: str,
280
url_safe: bool = False,
281
validate: bool = True,
282
) -> bytes:
283
"""
284
Decode Base64 string to bytes.
285
286
Parameters:
287
- data: Base64 encoded string
288
- url_safe: Whether string uses URL-safe Base64 encoding
289
- validate: Whether to validate Base64 format
290
291
Returns:
292
Decoded bytes
293
294
Raises:
295
ValueError: If Base64 string is invalid and validate=True
296
"""
297
298
def base64_from_str(
299
data: str,
300
encoding: str = "utf-8",
301
url_safe: bool = False,
302
) -> str:
303
"""
304
Encode string as Base64.
305
306
Parameters:
307
- data: String to encode
308
- encoding: String encoding to use
309
- url_safe: Whether to use URL-safe Base64
310
311
Returns:
312
Base64 encoded string
313
"""
314
315
def base64_from_json(
316
data: Any,
317
ensure_ascii: bool = True,
318
url_safe: bool = False,
319
) -> str:
320
"""
321
Encode JSON data as Base64 string.
322
323
Parameters:
324
- data: Data to serialize as JSON then encode
325
- ensure_ascii: Whether to ensure ASCII-only JSON
326
- url_safe: Whether to use URL-safe Base64
327
328
Returns:
329
Base64 encoded JSON string
330
"""
331
```
332
333
### Validation
334
335
Schema validation utilities for JSON data validation.
336
337
```python { .api }
338
def validate(
339
event: Dict[str, Any],
340
schema: Dict[str, Any],
341
envelope: str = None,
342
) -> Dict[str, Any]:
343
"""
344
Validate event data against JSON schema.
345
346
Parameters:
347
- event: Event data to validate
348
- schema: JSON schema for validation
349
- envelope: JMESPath expression to extract data from event
350
351
Returns:
352
Validated event data
353
354
Raises:
355
SchemaValidationError: If validation fails
356
InvalidSchemaFormatError: If schema format is invalid
357
InvalidEnvelopeExpressionError: If envelope expression is invalid
358
"""
359
360
def validator(
361
schema: Dict[str, Any],
362
envelope: str = None,
363
) -> Callable:
364
"""
365
Decorator for validating Lambda event data.
366
367
Parameters:
368
- schema: JSON schema for validation
369
- envelope: JMESPath expression for data extraction
370
371
Returns:
372
Decorator function that validates event before handler execution
373
"""
374
375
class InvalidSchemaFormatError(Exception):
376
"""Raised when JSON schema format is invalid"""
377
pass
378
379
class SchemaValidationError(Exception):
380
"""Raised when data validation against schema fails"""
381
pass
382
383
class InvalidEnvelopeExpressionError(Exception):
384
"""Raised when JMESPath envelope expression is invalid"""
385
pass
386
```
387
388
### JMESPath Utils
389
390
Utilities for JMESPath operations on JSON data with custom functions.
391
392
```python { .api }
393
def query(
394
data: Dict[str, Any],
395
expression: str,
396
options: Dict[str, Any] = None,
397
) -> Any:
398
"""
399
Execute JMESPath query on data.
400
401
Parameters:
402
- data: JSON data to query
403
- expression: JMESPath expression
404
- options: JMESPath options including custom functions
405
406
Returns:
407
Query result
408
"""
409
410
def extract_data_from_envelope(
411
data: Dict[str, Any],
412
envelope: str,
413
) -> Any:
414
"""
415
Extract data using JMESPath envelope expression.
416
417
Parameters:
418
- data: Source data
419
- envelope: JMESPath expression for data extraction
420
421
Returns:
422
Extracted data
423
424
Note: This function is deprecated, use query() instead
425
"""
426
427
class PowertoolsFunctions:
428
"""Built-in JMESPath functions for common operations"""
429
430
@staticmethod
431
def powertools_json(value: str) -> Any:
432
"""Parse JSON string"""
433
434
@staticmethod
435
def powertools_base64(value: str) -> str:
436
"""Decode Base64 string"""
437
438
@staticmethod
439
def powertools_base64_gzip(value: str) -> str:
440
"""Decode Base64 and decompress gzip"""
441
```
442
443
### Kafka Consumer
444
445
Utilities for processing Kafka messages with schema support and deserialization.
446
447
```python { .api }
448
def kafka_consumer(
449
record_handler: Callable[[Dict], Any],
450
deserializer: BaseDeserializer = None,
451
) -> Callable:
452
"""
453
Decorator for Kafka consumer Lambda functions.
454
455
Parameters:
456
- record_handler: Function to process individual Kafka records
457
- deserializer: Deserializer for Kafka message values
458
459
Returns:
460
Decorated Lambda function that processes Kafka events
461
"""
462
463
class ConsumerRecords:
464
"""Kafka consumer records container"""
465
466
def __init__(
467
self,
468
raw_event: Dict[str, Any],
469
deserializer: BaseDeserializer = None,
470
):
471
"""
472
Initialize consumer records.
473
474
Parameters:
475
- raw_event: Raw Kafka Lambda event
476
- deserializer: Message deserializer
477
"""
478
479
@property
480
def records(self) -> List[KafkaRecord]:
481
"""Get list of Kafka records"""
482
483
def __iter__(self) -> Iterator[KafkaRecord]:
484
"""Iterate over Kafka records"""
485
486
class SchemaConfig:
487
def __init__(
488
self,
489
schema_registry_url: str,
490
schema_name: str = None,
491
schema_version: int = None,
492
**kwargs,
493
):
494
"""
495
Schema configuration for Kafka message deserialization.
496
497
Parameters:
498
- schema_registry_url: Confluent Schema Registry URL
499
- schema_name: Schema name/subject
500
- schema_version: Specific schema version
501
- **kwargs: Additional schema registry client options
502
"""
503
504
class BaseDeserializer:
505
"""Base deserializer interface for Kafka messages"""
506
507
def deserialize(self, data: bytes, **kwargs) -> Any:
508
"""
509
Deserialize Kafka message data.
510
511
Parameters:
512
- data: Raw message bytes
513
- **kwargs: Additional deserialization options
514
515
Returns:
516
Deserialized message data
517
"""
518
```
519
520
### Lambda Context Type
521
522
Type definition for Lambda execution context.
523
524
```python { .api }
525
class LambdaContext:
526
"""AWS Lambda execution context"""
527
528
function_name: str
529
function_version: str
530
invoked_function_arn: str
531
memory_limit_in_mb: int
532
remaining_time_in_millis: int
533
request_id: str
534
log_group_name: str
535
log_stream_name: str
536
537
@property
538
def identity(self) -> Any:
539
"""Cognito identity information (mobile apps)"""
540
541
@property
542
def client_context(self) -> Any:
543
"""Client context information (mobile apps)"""
544
545
def get_remaining_time_in_millis(self) -> int:
546
"""Get remaining execution time in milliseconds"""
547
```
548
549
### Middleware Factory
550
551
Factory for creating Lambda handler middleware decorators.
552
553
```python { .api }
554
def lambda_handler_decorator(
555
trace_execution: bool = False,
556
clear_state: bool = False,
557
) -> Callable:
558
"""
559
Factory for creating Lambda handler decorators.
560
561
Parameters:
562
- trace_execution: Whether to trace decorator execution
563
- clear_state: Whether to clear state after execution
564
565
Returns:
566
Decorator factory function
567
"""
568
```
569
570
## Usage Examples
571
572
### Data Masking for Sensitive Information
573
574
```python
575
from aws_lambda_powertools.utilities.data_masking import DataMasking, BaseProvider
576
from aws_lambda_powertools.utilities.typing import LambdaContext
577
import json
578
579
class SimpleErasureProvider(BaseProvider):
580
"""Simple provider that erases sensitive data"""
581
582
def erase(self, data: str, **kwargs) -> str:
583
return "***MASKED***"
584
585
def encrypt(self, data: str, **kwargs) -> str:
586
# In real implementation, use proper encryption
587
return f"ENCRYPTED:{data[:3]}***"
588
589
def decrypt(self, data: str, **kwargs) -> str:
590
# In real implementation, use proper decryption
591
if data.startswith("ENCRYPTED:"):
592
return data.replace("ENCRYPTED:", "").replace("***", "")
593
return data
594
595
# Initialize data masking
596
masking = DataMasking(provider=SimpleErasureProvider())
597
598
def lambda_handler(event: dict, context: LambdaContext) -> dict:
599
# Sample user data with sensitive information
600
user_data = {
601
"user_id": "12345",
602
"name": "John Doe",
603
"email": "john@example.com",
604
"ssn": "123-45-6789",
605
"credit_card": "4111-1111-1111-1111",
606
"address": {
607
"street": "123 Main St",
608
"city": "Anytown",
609
"zip": "12345"
610
},
611
"preferences": {
612
"newsletter": True,
613
"phone": "555-123-4567"
614
}
615
}
616
617
# Erase sensitive fields for logging
618
safe_logging_data = masking.erase(
619
data=user_data,
620
fields=[
621
"ssn",
622
"credit_card",
623
"preferences.phone"
624
]
625
)
626
627
print(f"Processing user data: {json.dumps(safe_logging_data, indent=2)}")
628
629
# Encrypt sensitive data for storage
630
encrypted_data = masking.encrypt(
631
data=user_data,
632
fields=["ssn", "credit_card"]
633
)
634
635
# Store encrypted data
636
store_user_data(encrypted_data)
637
638
# Return response with masked sensitive data
639
response_data = masking.erase(
640
data=user_data,
641
fields=["ssn", "credit_card", "preferences.phone"]
642
)
643
644
return {
645
"statusCode": 200,
646
"body": json.dumps({
647
"message": "User data processed",
648
"user": response_data
649
})
650
}
651
652
def store_user_data(data: dict):
653
"""Store user data (mock implementation)"""
654
print("Storing encrypted user data to database")
655
```
656
657
### S3 Streaming with Transformations
658
659
```python
660
from aws_lambda_powertools.utilities.streaming import S3Object
661
from aws_lambda_powertools.utilities.streaming.transformations import (
662
GzipTransform,
663
CsvTransform
664
)
665
from aws_lambda_powertools.utilities.typing import LambdaContext
666
import json
667
668
def lambda_handler(event: dict, context: LambdaContext) -> dict:
669
# Get S3 event information
670
bucket = event["Records"][0]["s3"]["bucket"]["name"]
671
key = event["Records"][0]["s3"]["object"]["key"]
672
673
print(f"Processing S3 object: s3://{bucket}/{key}")
674
675
# Create S3 streaming object
676
s3_object = S3Object(bucket=bucket, key=key)
677
678
processed_records = 0
679
680
if key.endswith('.gz'):
681
# Handle compressed file
682
processed_records = process_compressed_file(s3_object, bucket)
683
elif key.endswith('.csv'):
684
# Handle CSV file
685
processed_records = process_csv_file(s3_object, bucket)
686
else:
687
# Handle regular text file
688
processed_records = process_text_file(s3_object, bucket)
689
690
return {
691
"statusCode": 200,
692
"body": json.dumps({
693
"processed_records": processed_records,
694
"source_bucket": bucket,
695
"source_key": key
696
})
697
}
698
699
def process_compressed_file(s3_object: S3Object, bucket: str) -> int:
700
"""Process gzip compressed file"""
701
702
# Add decompression transform
703
decompressed_object = s3_object.transform(GzipTransform(compress=False))
704
705
record_count = 0
706
707
# Process line by line without loading entire file into memory
708
for line in decompressed_object.iter_lines():
709
if line.strip(): # Skip empty lines
710
# Process individual line
711
process_log_line(line)
712
record_count += 1
713
714
print(f"Processed {record_count} log entries from compressed file")
715
return record_count
716
717
def process_csv_file(s3_object: S3Object, bucket: str) -> int:
718
"""Process CSV file and convert to JSON"""
719
720
# Transform CSV to JSON format
721
csv_transform = CsvTransform(
722
delimiter=",",
723
headers=["id", "name", "email", "created_at"]
724
)
725
726
json_object = s3_object.transform(csv_transform)
727
728
# Write transformed data to new S3 location
729
output_key = s3_object.key.replace('.csv', '.json')
730
731
json_object.write_to(
732
destination_bucket=bucket,
733
destination_key=f"processed/{output_key}",
734
ContentType="application/json"
735
)
736
737
# Count records by iterating through original CSV
738
record_count = sum(1 for line in s3_object.iter_lines()) - 1 # Subtract header
739
740
print(f"Converted {record_count} CSV records to JSON")
741
return record_count
742
743
def process_text_file(s3_object: S3Object, bucket: str) -> int:
744
"""Process regular text file"""
745
746
record_count = 0
747
processed_lines = []
748
749
# Process in chunks to handle large files
750
for chunk in s3_object.iter_chunks(chunk_size=8192):
751
# Process chunk data
752
chunk_str = chunk.decode('utf-8', errors='ignore')
753
lines = chunk_str.split('\n')
754
755
for line in lines:
756
if line.strip():
757
processed_line = process_text_line(line)
758
if processed_line:
759
processed_lines.append(processed_line)
760
record_count += 1
761
762
# Write processed data back to S3
763
if processed_lines:
764
output_data = '\n'.join(processed_lines)
765
766
# Use gzip compression for output
767
compressed_object = S3Object(
768
bucket=bucket,
769
key="temp/processing_output.txt"
770
).transform(GzipTransform(compress=True))
771
772
# This would require implementing write capability
773
# compressed_object.write(output_data.encode('utf-8'))
774
775
print(f"Processed {record_count} text lines")
776
return record_count
777
778
def process_log_line(line: str) -> None:
779
"""Process individual log line"""
780
try:
781
# Parse log line (e.g., JSON logs)
782
log_entry = json.loads(line)
783
784
# Extract relevant information
785
timestamp = log_entry.get("timestamp")
786
level = log_entry.get("level")
787
message = log_entry.get("message")
788
789
# Process based on log level
790
if level == "ERROR":
791
handle_error_log(log_entry)
792
elif level == "WARN":
793
handle_warning_log(log_entry)
794
795
except json.JSONDecodeError:
796
# Handle non-JSON log lines
797
print(f"Non-JSON log line: {line[:100]}...")
798
799
def process_text_line(line: str) -> str:
800
"""Process and transform text line"""
801
# Example: uppercase and add timestamp
802
import datetime
803
timestamp = datetime.datetime.utcnow().isoformat()
804
return f"[{timestamp}] {line.upper()}"
805
```
806
807
### Schema Validation
808
809
```python
810
from aws_lambda_powertools.utilities.validation import (
811
validate,
812
validator,
813
SchemaValidationError
814
)
815
from aws_lambda_powertools.utilities.typing import LambdaContext
816
import json
817
818
# JSON Schema for user registration
819
USER_REGISTRATION_SCHEMA = {
820
"type": "object",
821
"properties": {
822
"firstName": {
823
"type": "string",
824
"minLength": 1,
825
"maxLength": 50
826
},
827
"lastName": {
828
"type": "string",
829
"minLength": 1,
830
"maxLength": 50
831
},
832
"email": {
833
"type": "string",
834
"format": "email"
835
},
836
"age": {
837
"type": "integer",
838
"minimum": 13,
839
"maximum": 120
840
},
841
"preferences": {
842
"type": "object",
843
"properties": {
844
"newsletter": {"type": "boolean"},
845
"notifications": {"type": "boolean"}
846
},
847
"additionalProperties": False
848
}
849
},
850
"required": ["firstName", "lastName", "email", "age"],
851
"additionalProperties": False
852
}
853
854
# Order schema
855
ORDER_SCHEMA = {
856
"type": "object",
857
"properties": {
858
"orderId": {"type": "string"},
859
"customerId": {"type": "string"},
860
"items": {
861
"type": "array",
862
"items": {
863
"type": "object",
864
"properties": {
865
"productId": {"type": "string"},
866
"quantity": {"type": "integer", "minimum": 1},
867
"price": {"type": "number", "minimum": 0}
868
},
869
"required": ["productId", "quantity", "price"]
870
},
871
"minItems": 1
872
},
873
"totalAmount": {"type": "number", "minimum": 0}
874
},
875
"required": ["orderId", "customerId", "items", "totalAmount"]
876
}
877
878
@validator(schema=USER_REGISTRATION_SCHEMA, envelope="body")
879
def register_user_handler(event: dict, context: LambdaContext) -> dict:
880
"""Handler with automatic validation"""
881
882
# Event body is automatically validated against schema
883
user_data = event["body"]
884
885
print(f"Registering user: {user_data['firstName']} {user_data['lastName']}")
886
887
# Process validated user data
888
user_id = create_user_account(user_data)
889
890
return {
891
"statusCode": 201,
892
"body": json.dumps({
893
"userId": user_id,
894
"message": "User registered successfully"
895
})
896
}
897
898
def manual_validation_handler(event: dict, context: LambdaContext) -> dict:
899
"""Handler with manual validation"""
900
901
try:
902
# Manually validate event data
903
validated_data = validate(
904
event=event,
905
schema=ORDER_SCHEMA,
906
envelope="body" # Extract from event.body
907
)
908
909
order_data = validated_data["body"]
910
911
print(f"Processing order: {order_data['orderId']}")
912
913
# Validate business rules after schema validation
914
validate_business_rules(order_data)
915
916
# Process order
917
result = process_order(order_data)
918
919
return {
920
"statusCode": 200,
921
"body": json.dumps(result)
922
}
923
924
except SchemaValidationError as e:
925
print(f"Schema validation failed: {e}")
926
return {
927
"statusCode": 400,
928
"body": json.dumps({
929
"error": "Invalid request data",
930
"details": str(e)
931
})
932
}
933
except BusinessRuleError as e:
934
print(f"Business rule validation failed: {e}")
935
return {
936
"statusCode": 422,
937
"body": json.dumps({
938
"error": "Business rule violation",
939
"details": str(e)
940
})
941
}
942
943
def validate_business_rules(order_data: dict):
944
"""Additional business rule validation"""
945
946
# Validate total amount matches item prices
947
calculated_total = sum(
948
item["quantity"] * item["price"]
949
for item in order_data["items"]
950
)
951
952
if abs(order_data["totalAmount"] - calculated_total) > 0.01:
953
raise BusinessRuleError("Total amount doesn't match item prices")
954
955
# Validate order limits
956
if order_data["totalAmount"] > 10000:
957
raise BusinessRuleError("Order exceeds maximum amount limit")
958
959
# Validate item availability
960
for item in order_data["items"]:
961
if not is_product_available(item["productId"], item["quantity"]):
962
raise BusinessRuleError(f"Product {item['productId']} not available")
963
964
class BusinessRuleError(Exception):
965
"""Custom exception for business rule violations"""
966
pass
967
968
def create_user_account(user_data: dict) -> str:
969
"""Create user account with validated data"""
970
import uuid
971
972
user_id = str(uuid.uuid4())
973
974
# Save to database
975
print(f"Creating user account: {user_id}")
976
977
return user_id
978
979
def process_order(order_data: dict) -> dict:
980
"""Process validated order"""
981
982
# Reserve inventory
983
for item in order_data["items"]:
984
reserve_inventory(item["productId"], item["quantity"])
985
986
# Process payment
987
payment_id = process_payment(order_data["totalAmount"])
988
989
return {
990
"orderId": order_data["orderId"],
991
"paymentId": payment_id,
992
"status": "confirmed"
993
}
994
```
995
996
### JMESPath Data Extraction
997
998
```python
999
from aws_lambda_powertools.utilities.jmespath_utils import query, PowertoolsFunctions
1000
from aws_lambda_powertools.utilities.typing import LambdaContext
1001
import json
1002
1003
def lambda_handler(event: dict, context: LambdaContext) -> dict:
1004
"""Demonstrate JMESPath operations"""
1005
1006
# Complex nested event data
1007
event_data = {
1008
"Records": [
1009
{
1010
"eventSource": "aws:s3",
1011
"s3": {
1012
"bucket": {"name": "my-bucket"},
1013
"object": {"key": "data/file1.json", "size": 1024}
1014
},
1015
"eventName": "ObjectCreated:Put"
1016
},
1017
{
1018
"eventSource": "aws:s3",
1019
"s3": {
1020
"bucket": {"name": "my-bucket"},
1021
"object": {"key": "data/file2.json", "size": 2048}
1022
},
1023
"eventName": "ObjectCreated:Put"
1024
}
1025
],
1026
"responsePayload": json.dumps({
1027
"users": [
1028
{"id": 1, "name": "Alice", "active": True},
1029
{"id": 2, "name": "Bob", "active": False}
1030
]
1031
})
1032
}
1033
1034
# Extract S3 bucket names
1035
bucket_names = query(
1036
data=event_data,
1037
expression="Records[*].s3.bucket.name"
1038
)
1039
print(f"Bucket names: {bucket_names}")
1040
1041
# Extract object keys for created objects only
1042
created_objects = query(
1043
data=event_data,
1044
expression="Records[?eventName == 'ObjectCreated:Put'].s3.object.key"
1045
)
1046
print(f"Created objects: {created_objects}")
1047
1048
# Calculate total size of all objects
1049
total_size = query(
1050
data=event_data,
1051
expression="sum(Records[*].s3.object.size)"
1052
)
1053
print(f"Total size: {total_size} bytes")
1054
1055
# Extract and parse JSON payload using custom functions
1056
options = {"custom_functions": PowertoolsFunctions()}
1057
1058
users_data = query(
1059
data=event_data,
1060
expression="powertools_json(responsePayload).users",
1061
options=options
1062
)
1063
print(f"Users data: {users_data}")
1064
1065
# Get active users only
1066
active_users = query(
1067
data=event_data,
1068
expression="powertools_json(responsePayload).users[?active == `true`].name",
1069
options=options
1070
)
1071
print(f"Active users: {active_users}")
1072
1073
# Complex filtering and transformation
1074
processed_records = query(
1075
data=event_data,
1076
expression="""Records[?eventSource == 'aws:s3'] | [].{
1077
bucket: s3.bucket.name,
1078
key: s3.object.key,
1079
size_mb: s3.object.size / `1024` / `1024`,
1080
is_large: s3.object.size > `1500`
1081
}"""
1082
)
1083
print(f"Processed records: {json.dumps(processed_records, indent=2)}")
1084
1085
return {
1086
"statusCode": 200,
1087
"body": json.dumps({
1088
"bucket_names": bucket_names,
1089
"created_objects": created_objects,
1090
"total_size": total_size,
1091
"active_users": active_users,
1092
"processed_records": processed_records
1093
})
1094
}
1095
```
1096
1097
### Serialization Utilities
1098
1099
```python
1100
from aws_lambda_powertools.utilities.serialization import (
1101
base64_encode,
1102
base64_decode,
1103
base64_from_str,
1104
base64_from_json
1105
)
1106
from aws_lambda_powertools.utilities.typing import LambdaContext
1107
import json
1108
1109
def lambda_handler(event: dict, context: LambdaContext) -> dict:
1110
"""Demonstrate serialization utilities"""
1111
1112
# Sample data to work with
1113
sample_data = {
1114
"user_id": "12345",
1115
"action": "purchase",
1116
"metadata": {
1117
"product_id": "prod-789",
1118
"amount": 99.99,
1119
"currency": "USD"
1120
}
1121
}
1122
1123
# Encode JSON as Base64
1124
encoded_json = base64_from_json(sample_data)
1125
print(f"Encoded JSON: {encoded_json}")
1126
1127
# Encode string as Base64
1128
message = "Hello, World!"
1129
encoded_string = base64_from_str(message)
1130
print(f"Encoded string: {encoded_string}")
1131
1132
# Encode bytes as Base64
1133
binary_data = b"Binary data content"
1134
encoded_bytes = base64_encode(binary_data)
1135
print(f"Encoded bytes: {encoded_bytes}")
1136
1137
# URL-safe Base64 encoding
1138
url_safe_encoded = base64_encode(binary_data, url_safe=True)
1139
print(f"URL-safe encoded: {url_safe_encoded}")
1140
1141
# Decode Base64 back to original data
1142
decoded_json_bytes = base64_decode(encoded_json)
1143
decoded_json = json.loads(decoded_json_bytes.decode('utf-8'))
1144
print(f"Decoded JSON: {decoded_json}")
1145
1146
decoded_string_bytes = base64_decode(encoded_string)
1147
decoded_string = decoded_string_bytes.decode('utf-8')
1148
print(f"Decoded string: {decoded_string}")
1149
1150
decoded_bytes = base64_decode(encoded_bytes)
1151
print(f"Decoded bytes: {decoded_bytes}")
1152
1153
# Handle API Gateway events with Base64 encoded bodies
1154
api_event_body = event.get("body", "")
1155
is_base64_encoded = event.get("isBase64Encoded", False)
1156
1157
if is_base64_encoded and api_event_body:
1158
try:
1159
# Decode Base64 body
1160
decoded_body_bytes = base64_decode(api_event_body, validate=True)
1161
decoded_body = decoded_body_bytes.decode('utf-8')
1162
1163
# Parse as JSON if possible
1164
try:
1165
body_data = json.loads(decoded_body)
1166
print(f"Decoded API body: {body_data}")
1167
except json.JSONDecodeError:
1168
print(f"Decoded API body (text): {decoded_body}")
1169
1170
except ValueError as e:
1171
print(f"Failed to decode Base64 body: {e}")
1172
return {
1173
"statusCode": 400,
1174
"body": json.dumps({"error": "Invalid Base64 encoding"})
1175
}
1176
1177
# Prepare response with Base64 encoded data
1178
response_data = {
1179
"original_data": sample_data,
1180
"encoded_formats": {
1181
"json_base64": encoded_json,
1182
"string_base64": encoded_string,
1183
"bytes_base64": encoded_bytes,
1184
"url_safe_base64": url_safe_encoded
1185
},
1186
"decoded_verification": {
1187
"json_matches": decoded_json == sample_data,
1188
"string_matches": decoded_string == message,
1189
"bytes_matches": decoded_bytes == binary_data
1190
}
1191
}
1192
1193
return {
1194
"statusCode": 200,
1195
"body": json.dumps(response_data, indent=2)
1196
}
1197
```
1198
1199
## Types
1200
1201
```python { .api }
1202
from typing import Dict, Any, List, Union, Optional, Iterator, Callable, Type
1203
import boto3
1204
1205
# Data masking types
1206
class BaseProvider:
1207
"""Base provider interface for data masking operations"""
1208
pass
1209
1210
MaskingProvider = BaseProvider
1211
FieldPath = str # JMESPath-style field path
1212
EncryptionOptions = Dict[str, Any]
1213
1214
# Streaming types
1215
class S3Object:
1216
"""S3 object streaming interface"""
1217
pass
1218
1219
class BaseTransform:
1220
"""Base transformation interface"""
1221
pass
1222
1223
StreamChunk = bytes
1224
StreamLine = str
1225
ChunkIterator = Iterator[bytes]
1226
LineIterator = Iterator[str]
1227
1228
# Transformation options
1229
CompressionLevel = int # 0-9 for compression algorithms
1230
CsvOptions = Dict[str, Any]
1231
TransformOptions = Dict[str, Any]
1232
1233
# Serialization types
1234
EncodableData = Union[str, bytes]
1235
JsonData = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
1236
Base64String = str
1237
EncodingType = str # e.g., "utf-8", "ascii"
1238
1239
# Validation types
1240
JsonSchema = Dict[str, Any]
1241
ValidationResult = Dict[str, Any]
1242
JMESPathExpression = str
1243
1244
class ValidationError(Exception):
1245
"""Base validation error"""
1246
pass
1247
1248
class SchemaValidationError(ValidationError):
1249
"""Schema validation specific error"""
1250
pass
1251
1252
class InvalidSchemaFormatError(ValidationError):
1253
"""Invalid schema format error"""
1254
pass
1255
1256
class InvalidEnvelopeExpressionError(ValidationError):
1257
"""Invalid JMESPath envelope error"""
1258
pass
1259
1260
# JMESPath types
1261
QueryExpression = str
1262
QueryResult = Any
1263
QueryOptions = Dict[str, Any]
1264
CustomFunctions = Any # JMESPath custom function registry
1265
1266
# Kafka types
1267
class ConsumerRecords:
1268
"""Kafka consumer records container"""
1269
pass
1270
1271
class BaseDeserializer:
1272
"""Kafka message deserializer interface"""
1273
pass
1274
1275
class SchemaConfig:
1276
"""Kafka schema configuration"""
1277
pass
1278
1279
KafkaRecord = Dict[str, Any]
1280
DeserializedMessage = Any
1281
SchemaRegistryUrl = str
1282
1283
# Lambda context type
1284
from aws_lambda_powertools.utilities.typing import LambdaContext
1285
1286
# Middleware types
1287
MiddlewareDecorator = Callable[[Callable], Callable]
1288
HandlerFunction = Callable[[Dict[str, Any], LambdaContext], Dict[str, Any]]
1289
1290
# Boto3 session type
1291
Boto3Session = boto3.Session
1292
Boto3Config = Dict[str, Any]
1293
```