0
# Core Observability
1
2
Essential observability utilities for AWS Lambda functions including structured logging with Lambda context enrichment, CloudWatch embedded metric format (EMF) for custom metrics, and AWS X-Ray integration for distributed tracing.
3
4
## Capabilities
5
6
### Logger
7
8
Structured logging utility that automatically enriches log entries with Lambda context information and provides JSON formatting optimized for CloudWatch Logs.
9
10
```python { .api }
11
class Logger:
12
def __init__(
13
self,
14
service: str = None,
15
level: str = "INFO",
16
child: bool = False,
17
sampling_rate: float = 0.0,
18
stream: TextIO = None,
19
logger_formatter: PowertoolsFormatter = None,
20
logger_handler: logging.Handler = None,
21
log_uncaught_exceptions: bool = False,
22
json_serializer: Callable[[Dict], str] = None,
23
json_deserializer: Callable[[Union[Dict, str, bool, int, float]], str] = None,
24
json_default: Callable[[Any], Any] = None,
25
datefmt: str = None,
26
use_datetime_directive: bool = False,
27
log_record_order: List[str] = None,
28
utc: bool = False,
29
use_rfc3339: bool = False,
30
):
31
"""
32
Initialize Logger with configuration options.
33
34
Parameters:
35
- service: Service name to identify origin of logs
36
- level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
37
- child: Whether this is a child logger
38
- sampling_rate: Debug log sampling rate (0.0-1.0)
39
- stream: Output stream for logs
40
- logger_formatter: Custom log formatter
41
- logger_handler: Custom log handler
42
- log_uncaught_exceptions: Whether to log uncaught exceptions
43
- json_serializer: Custom JSON serializer function
44
- json_deserializer: Custom JSON deserializer function
45
- json_default: Default function for non-serializable objects
46
- datefmt: Date format string
47
- use_datetime_directive: Whether to use datetime directive
48
- log_record_order: Order of log record fields
49
- utc: Whether to use UTC timezone
50
- use_rfc3339: Whether to use RFC3339 datetime format
51
"""
52
53
def debug(self, msg: Any, *args, **kwargs) -> None:
54
"""Log debug message"""
55
56
def info(self, msg: Any, *args, **kwargs) -> None:
57
"""Log info message"""
58
59
def warning(self, msg: Any, *args, **kwargs) -> None:
60
"""Log warning message"""
61
62
def error(self, msg: Any, *args, **kwargs) -> None:
63
"""Log error message"""
64
65
def critical(self, msg: Any, *args, **kwargs) -> None:
66
"""Log critical message"""
67
68
def exception(self, msg: Any, *args, **kwargs) -> None:
69
"""Log exception with traceback"""
70
71
def setLevel(self, level: Union[str, int]) -> None:
72
"""Set logging level"""
73
74
def inject_lambda_context(
75
self,
76
correlation_id_path: str = None,
77
log_event: bool = False,
78
correlation_id: str = None,
79
clear_state: bool = False,
80
) -> Callable:
81
"""
82
Decorator to inject Lambda context into logs.
83
84
Parameters:
85
- correlation_id_path: JMESPath to extract correlation ID from event
86
- log_event: Whether to log the incoming event
87
- correlation_id: Static correlation ID to use
88
- clear_state: Whether to clear logger state after invocation
89
90
Returns:
91
Decorated function with Lambda context injection
92
"""
93
94
def append_keys(self, **kwargs) -> None:
95
"""Append additional keys to all subsequent log entries"""
96
97
def remove_keys(self, keys: List[str]) -> None:
98
"""Remove keys from subsequent log entries"""
99
100
def structure_logs(
101
self,
102
append: bool = False,
103
**kwargs,
104
) -> Callable:
105
"""
106
Decorator to add structured information to logs.
107
108
Parameters:
109
- append: Whether to append to existing structured data
110
- **kwargs: Key-value pairs to add to log structure
111
112
Returns:
113
Decorated function with structured logging
114
"""
115
116
def set_correlation_id(self, value: str) -> None:
117
"""Set correlation ID for request tracing"""
118
119
def get_correlation_id(self) -> str:
120
"""Get current correlation ID"""
121
```
122
123
### Logger Buffer Configuration
124
125
Configuration for logger buffering to reduce CloudWatch Logs API calls.
126
127
```python { .api }
128
class LoggerBufferConfig:
129
def __init__(
130
self,
131
buffer_size: int = 100,
132
flush_on_exit: bool = True,
133
max_buffer_time: int = 5,
134
):
135
"""
136
Configure logger buffering behavior.
137
138
Parameters:
139
- buffer_size: Number of log entries to buffer before flushing
140
- flush_on_exit: Whether to flush buffer on Lambda exit
141
- max_buffer_time: Maximum time (seconds) to buffer logs
142
"""
143
```
144
145
### Metrics
146
147
CloudWatch embedded metric format (EMF) utility for publishing custom metrics with high-cardinality dimensions.
148
149
```python { .api }
150
class Metrics:
151
def __init__(
152
self,
153
service: str = None,
154
namespace: str = None,
155
metadata: Dict[str, Any] = None,
156
default_dimensions: Dict[str, str] = None,
157
):
158
"""
159
Initialize Metrics with configuration.
160
161
Parameters:
162
- service: Service name for default dimensions
163
- namespace: CloudWatch namespace for metrics
164
- metadata: Default metadata to include
165
- default_dimensions: Default dimensions for all metrics
166
"""
167
168
def add_metric(
169
self,
170
name: str,
171
unit: MetricUnit,
172
value: Union[float, int],
173
resolution: MetricResolution = 60,
174
) -> None:
175
"""
176
Add a metric to be published.
177
178
Parameters:
179
- name: Metric name
180
- unit: Unit of measurement
181
- value: Metric value
182
- resolution: Metric resolution in seconds (1 or 60)
183
"""
184
185
def add_dimension(self, name: str, value: str) -> None:
186
"""Add a dimension to current metric context"""
187
188
def add_metadata(self, key: str, value: Any) -> None:
189
"""Add metadata to current metric context"""
190
191
def set_default_dimensions(self, **dimensions) -> None:
192
"""Set default dimensions for all metrics"""
193
194
def clear_metrics(self) -> None:
195
"""Clear all metrics and dimensions"""
196
197
def clear_dimensions(self) -> None:
198
"""Clear all dimensions"""
199
200
def clear_metadata(self) -> None:
201
"""Clear all metadata"""
202
203
def log_metrics(
204
self,
205
lambda_handler: Callable = None,
206
capture_cold_start_metric: bool = False,
207
raise_on_empty_metrics: bool = False,
208
default_dimensions: Dict[str, str] = None,
209
) -> Callable:
210
"""
211
Decorator to automatically publish metrics after Lambda execution.
212
213
Parameters:
214
- lambda_handler: Lambda handler function to decorate
215
- capture_cold_start_metric: Whether to capture cold start metric
216
- raise_on_empty_metrics: Whether to raise error if no metrics added
217
- default_dimensions: Default dimensions to apply
218
219
Returns:
220
Decorated function with automatic metric publishing
221
"""
222
223
class EphemeralMetrics:
224
def __init__(
225
self,
226
service: str = None,
227
namespace: str = None,
228
):
229
"""
230
Ephemeral metrics that don't persist beyond function execution.
231
232
Parameters:
233
- service: Service name for default dimensions
234
- namespace: CloudWatch namespace for metrics
235
"""
236
237
def add_metric(
238
self,
239
name: str,
240
unit: MetricUnit,
241
value: Union[float, int],
242
) -> None:
243
"""Add ephemeral metric"""
244
245
def single_metric(
246
name: str,
247
unit: MetricUnit,
248
value: Union[float, int],
249
resolution: MetricResolution = 60,
250
default_dimensions: Dict[str, str] = None,
251
namespace: str = None,
252
) -> ContextManager:
253
"""
254
Context manager for publishing a single metric.
255
256
Parameters:
257
- name: Metric name
258
- unit: Unit of measurement
259
- value: Metric value
260
- resolution: Metric resolution in seconds
261
- default_dimensions: Dimensions to apply
262
- namespace: CloudWatch namespace
263
264
Returns:
265
Context manager that publishes metric on exit
266
"""
267
```
268
269
### Tracer
270
271
AWS X-Ray tracing utility for distributed tracing across AWS services.
272
273
```python { .api }
274
class Tracer:
275
def __init__(
276
self,
277
service: str = None,
278
disabled: bool = False,
279
auto_patch: bool = True,
280
patch_modules: List[str] = None,
281
provider: BaseProvider = None,
282
):
283
"""
284
Initialize Tracer with configuration.
285
286
Parameters:
287
- service: Service name for tracing
288
- disabled: Whether tracing is disabled
289
- auto_patch: Whether to auto-patch supported libraries
290
- patch_modules: Specific modules to patch for tracing
291
- provider: Custom tracing provider
292
"""
293
294
def capture_lambda_handler(
295
self,
296
lambda_handler: Callable = None,
297
capture_response: bool = True,
298
capture_error: bool = True,
299
) -> Callable:
300
"""
301
Decorator to capture Lambda handler execution in X-Ray trace.
302
303
Parameters:
304
- lambda_handler: Lambda handler function to trace
305
- capture_response: Whether to capture response in trace
306
- capture_error: Whether to capture errors in trace
307
308
Returns:
309
Decorated function with Lambda handler tracing
310
"""
311
312
def capture_method(
313
self,
314
method: Callable = None,
315
capture_response: bool = True,
316
capture_error: bool = True,
317
) -> Callable:
318
"""
319
Decorator to capture method execution in X-Ray subsegment.
320
321
Parameters:
322
- method: Method to trace
323
- capture_response: Whether to capture response
324
- capture_error: Whether to capture errors
325
326
Returns:
327
Decorated method with tracing
328
"""
329
330
def put_annotation(self, key: str, value: Union[str, int, float, bool]) -> None:
331
"""
332
Add annotation to current trace segment.
333
334
Parameters:
335
- key: Annotation key
336
- value: Annotation value (searchable in X-Ray console)
337
"""
338
339
def put_metadata(
340
self,
341
key: str,
342
value: Any,
343
namespace: str = "default",
344
) -> None:
345
"""
346
Add metadata to current trace segment.
347
348
Parameters:
349
- key: Metadata key
350
- value: Metadata value (not searchable, for debugging)
351
- namespace: Metadata namespace
352
"""
353
354
def patch(self, modules_to_patch: List[str]) -> None:
355
"""Manually patch modules for tracing"""
356
357
def provider(self) -> BaseProvider:
358
"""Get current tracing provider"""
359
360
def is_disabled(self) -> bool:
361
"""Check if tracing is disabled"""
362
363
def aiohttp_trace_config() -> Any:
364
"""
365
Get aiohttp trace configuration for async HTTP client tracing.
366
367
Returns:
368
aiohttp TraceConfig object for automatic request tracing
369
"""
370
```
371
372
## Usage Examples
373
374
### Basic Logging with Lambda Context
375
376
```python
377
from aws_lambda_powertools import Logger
378
from aws_lambda_powertools.utilities.typing import LambdaContext
379
380
logger = Logger(service="payment-service")
381
382
@logger.inject_lambda_context(log_event=True)
383
def lambda_handler(event: dict, context: LambdaContext) -> dict:
384
logger.info("Processing payment", extra={
385
"payment_id": event.get("payment_id"),
386
"amount": event.get("amount")
387
})
388
389
try:
390
# Process payment logic
391
result = process_payment(event)
392
logger.info("Payment processed successfully", extra={"result": result})
393
return {"statusCode": 200, "body": result}
394
except Exception as e:
395
logger.exception("Payment processing failed")
396
return {"statusCode": 500, "body": "Payment failed"}
397
```
398
399
### Metrics with Custom Dimensions
400
401
```python
402
from aws_lambda_powertools import Metrics
403
from aws_lambda_powertools.metrics import MetricUnit
404
405
metrics = Metrics(service="payment-service", namespace="ECommerce")
406
407
@metrics.log_metrics(capture_cold_start_metric=True)
408
def lambda_handler(event: dict, context: LambdaContext) -> dict:
409
# Add custom dimensions
410
metrics.add_dimension("payment_method", event.get("payment_method", "unknown"))
411
metrics.add_dimension("region", event.get("region", "us-east-1"))
412
413
# Record metrics
414
metrics.add_metric(name="PaymentAttempted", unit=MetricUnit.Count, value=1)
415
416
try:
417
amount = float(event.get("amount", 0))
418
metrics.add_metric(name="PaymentAmount", unit=MetricUnit.None, value=amount)
419
420
# Process payment
421
success = process_payment(event)
422
423
if success:
424
metrics.add_metric(name="PaymentSuccessful", unit=MetricUnit.Count, value=1)
425
else:
426
metrics.add_metric(name="PaymentFailed", unit=MetricUnit.Count, value=1)
427
428
return {"statusCode": 200 if success else 400}
429
430
except Exception as e:
431
metrics.add_metric(name="PaymentError", unit=MetricUnit.Count, value=1)
432
raise
433
```
434
435
### Distributed Tracing with X-Ray
436
437
```python
438
from aws_lambda_powertools import Tracer
439
import boto3
440
441
tracer = Tracer(service="payment-service")
442
dynamodb = boto3.resource("dynamodb")
443
444
@tracer.capture_lambda_handler
445
def lambda_handler(event: dict, context: LambdaContext) -> dict:
446
tracer.put_annotation("payment_id", event.get("payment_id"))
447
tracer.put_metadata("event_details", event)
448
449
# Trace database operations
450
user_id = get_user_id(event["payment_id"])
451
452
# Trace external service calls
453
payment_result = process_external_payment(event)
454
455
return {"statusCode": 200, "result": payment_result}
456
457
@tracer.capture_method
458
def get_user_id(payment_id: str) -> str:
459
table = dynamodb.Table("payments")
460
461
# This DynamoDB call will be automatically traced
462
response = table.get_item(Key={"payment_id": payment_id})
463
464
tracer.put_annotation("user_found", "user_id" in response.get("Item", {}))
465
return response.get("Item", {}).get("user_id")
466
467
@tracer.capture_method(capture_response=False) # Don't capture sensitive response
468
def process_external_payment(event: dict) -> dict:
469
# External payment processing logic
470
tracer.put_annotation("payment_processor", "stripe")
471
472
# Simulate external API call
473
result = {"status": "success", "transaction_id": "txn_123"}
474
475
tracer.put_metadata("payment_processor_response", {
476
"status": result["status"],
477
"transaction_id": result["transaction_id"]
478
})
479
480
return result
481
```
482
483
### Combined Observability Pattern
484
485
```python
486
from aws_lambda_powertools import Logger, Metrics, Tracer
487
from aws_lambda_powertools.metrics import MetricUnit
488
from aws_lambda_powertools.utilities.typing import LambdaContext
489
490
# Initialize all observability tools
491
logger = Logger(service="order-service")
492
metrics = Metrics(service="order-service", namespace="ECommerce")
493
tracer = Tracer(service="order-service")
494
495
@logger.inject_lambda_context(correlation_id_path="headers.x-correlation-id")
496
@tracer.capture_lambda_handler
497
@metrics.log_metrics(capture_cold_start_metric=True)
498
def lambda_handler(event: dict, context: LambdaContext) -> dict:
499
# Extract order details
500
order_id = event.get("order_id")
501
customer_id = event.get("customer_id")
502
503
# Add structured logging context
504
logger.append_keys(order_id=order_id, customer_id=customer_id)
505
506
# Add tracing annotations for searchability
507
tracer.put_annotation("order_id", order_id)
508
tracer.put_annotation("customer_id", customer_id)
509
510
# Add metric dimensions
511
metrics.add_dimension("order_type", event.get("order_type", "standard"))
512
metrics.add_dimension("region", event.get("region", "us-east-1"))
513
514
logger.info("Processing order")
515
516
try:
517
# Process order logic
518
order = process_order(event)
519
520
# Record success metrics
521
metrics.add_metric(name="OrderProcessed", unit=MetricUnit.Count, value=1)
522
metrics.add_metric(name="OrderValue", unit=MetricUnit.None, value=order["total"])
523
524
logger.info("Order processed successfully", extra={"order_total": order["total"]})
525
526
return {
527
"statusCode": 200,
528
"body": {"order_id": order_id, "status": "processed"}
529
}
530
531
except ValidationError as e:
532
logger.warning("Order validation failed", extra={"error": str(e)})
533
metrics.add_metric(name="OrderValidationError", unit=MetricUnit.Count, value=1)
534
tracer.put_annotation("error_type", "validation")
535
536
return {"statusCode": 400, "body": {"error": "Invalid order"}}
537
538
except Exception as e:
539
logger.exception("Order processing failed")
540
metrics.add_metric(name="OrderProcessingError", unit=MetricUnit.Count, value=1)
541
tracer.put_annotation("error_type", "processing")
542
543
return {"statusCode": 500, "body": {"error": "Processing failed"}}
544
545
@tracer.capture_method
546
def process_order(event: dict) -> dict:
547
"""Process order with detailed tracing"""
548
tracer.put_metadata("order_details", event)
549
550
# Validate order
551
validate_order(event)
552
553
# Calculate totals
554
total = calculate_order_total(event)
555
556
# Save to database
557
save_order(event, total)
558
559
return {"order_id": event["order_id"], "total": total}
560
```
561
562
## Types
563
564
```python { .api }
565
from typing import Union, Dict, Any, List, Callable, TextIO, ContextManager
566
from logging import Handler
567
568
# Metric types
569
MetricUnit = Literal[
570
"Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes",
571
"Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits",
572
"Megabits", "Gigabits", "Terabits", "Percent", "Count",
573
"Bytes/Second", "Kilobytes/Second", "Megabytes/Second",
574
"Gigabytes/Second", "Terabytes/Second", "Bits/Second",
575
"Kilobits/Second", "Megabits/Second", "Gigabits/Second",
576
"Terabits/Second", "Count/Second", "None"
577
]
578
579
MetricResolution = Literal[1, 60]
580
581
# Exception types
582
class MetricUnitError(Exception):
583
"""Raised when an invalid metric unit is used"""
584
pass
585
586
class MetricResolutionError(Exception):
587
"""Raised when an invalid metric resolution is used"""
588
pass
589
590
class MetricValueError(Exception):
591
"""Raised when an invalid metric value is provided"""
592
pass
593
594
class SchemaValidationError(Exception):
595
"""Raised when metric schema validation fails"""
596
pass
597
598
# Formatter types
599
class PowertoolsFormatter:
600
"""Custom log formatter for Powertools"""
601
def __init__(
602
self,
603
json_serializer: Callable[[Dict], str] = None,
604
json_deserializer: Callable = None,
605
json_default: Callable[[Any], Any] = None,
606
datefmt: str = None,
607
use_datetime_directive: bool = False,
608
log_record_order: List[str] = None,
609
utc: bool = False,
610
use_rfc3339: bool = False,
611
): ...
612
613
# Provider types for tracing
614
class BaseProvider:
615
"""Base tracing provider interface"""
616
pass
617
```