or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdcore-observability.mddata-classes.mdevent-handlers.mdfeature-flags.mdindex.mdparameters.mdparser.mdutilities.md

core-observability.mddocs/

0

# Core Observability

1

2

Essential observability utilities for AWS Lambda functions including structured logging with Lambda context enrichment, CloudWatch embedded metric format (EMF) for custom metrics, and AWS X-Ray integration for distributed tracing.

3

4

## Capabilities

5

6

### Logger

7

8

Structured logging utility that automatically enriches log entries with Lambda context information and provides JSON formatting optimized for CloudWatch Logs.

9

10

```python { .api }

11

class Logger:

12

def __init__(

13

self,

14

service: str = None,

15

level: str = "INFO",

16

child: bool = False,

17

sampling_rate: float = 0.0,

18

stream: TextIO = None,

19

logger_formatter: PowertoolsFormatter = None,

20

logger_handler: logging.Handler = None,

21

log_uncaught_exceptions: bool = False,

22

json_serializer: Callable[[Dict], str] = None,

23

json_deserializer: Callable[[Union[Dict, str, bool, int, float]], str] = None,

24

json_default: Callable[[Any], Any] = None,

25

datefmt: str = None,

26

use_datetime_directive: bool = False,

27

log_record_order: List[str] = None,

28

utc: bool = False,

29

use_rfc3339: bool = False,

30

):

31

"""

32

Initialize Logger with configuration options.

33

34

Parameters:

35

- service: Service name to identify origin of logs

36

- level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

37

- child: Whether this is a child logger

38

- sampling_rate: Debug log sampling rate (0.0-1.0)

39

- stream: Output stream for logs

40

- logger_formatter: Custom log formatter

41

- logger_handler: Custom log handler

42

- log_uncaught_exceptions: Whether to log uncaught exceptions

43

- json_serializer: Custom JSON serializer function

44

- json_deserializer: Custom JSON deserializer function

45

- json_default: Default function for non-serializable objects

46

- datefmt: Date format string

47

- use_datetime_directive: Whether to use datetime directive

48

- log_record_order: Order of log record fields

49

- utc: Whether to use UTC timezone

50

- use_rfc3339: Whether to use RFC3339 datetime format

51

"""

52

53

def debug(self, msg: Any, *args, **kwargs) -> None:

54

"""Log debug message"""

55

56

def info(self, msg: Any, *args, **kwargs) -> None:

57

"""Log info message"""

58

59

def warning(self, msg: Any, *args, **kwargs) -> None:

60

"""Log warning message"""

61

62

def error(self, msg: Any, *args, **kwargs) -> None:

63

"""Log error message"""

64

65

def critical(self, msg: Any, *args, **kwargs) -> None:

66

"""Log critical message"""

67

68

def exception(self, msg: Any, *args, **kwargs) -> None:

69

"""Log exception with traceback"""

70

71

def setLevel(self, level: Union[str, int]) -> None:

72

"""Set logging level"""

73

74

def inject_lambda_context(

75

self,

76

correlation_id_path: str = None,

77

log_event: bool = False,

78

correlation_id: str = None,

79

clear_state: bool = False,

80

) -> Callable:

81

"""

82

Decorator to inject Lambda context into logs.

83

84

Parameters:

85

- correlation_id_path: JMESPath to extract correlation ID from event

86

- log_event: Whether to log the incoming event

87

- correlation_id: Static correlation ID to use

88

- clear_state: Whether to clear logger state after invocation

89

90

Returns:

91

Decorated function with Lambda context injection

92

"""

93

94

def append_keys(self, **kwargs) -> None:

95

"""Append additional keys to all subsequent log entries"""

96

97

def remove_keys(self, keys: List[str]) -> None:

98

"""Remove keys from subsequent log entries"""

99

100

def structure_logs(

101

self,

102

append: bool = False,

103

**kwargs,

104

) -> Callable:

105

"""

106

Decorator to add structured information to logs.

107

108

Parameters:

109

- append: Whether to append to existing structured data

110

- **kwargs: Key-value pairs to add to log structure

111

112

Returns:

113

Decorated function with structured logging

114

"""

115

116

def set_correlation_id(self, value: str) -> None:

117

"""Set correlation ID for request tracing"""

118

119

def get_correlation_id(self) -> str:

120

"""Get current correlation ID"""

121

```

122

123

### Logger Buffer Configuration

124

125

Configuration for logger buffering to reduce CloudWatch Logs API calls.

126

127

```python { .api }

128

class LoggerBufferConfig:

129

def __init__(

130

self,

131

buffer_size: int = 100,

132

flush_on_exit: bool = True,

133

max_buffer_time: int = 5,

134

):

135

"""

136

Configure logger buffering behavior.

137

138

Parameters:

139

- buffer_size: Number of log entries to buffer before flushing

140

- flush_on_exit: Whether to flush buffer on Lambda exit

141

- max_buffer_time: Maximum time (seconds) to buffer logs

142

"""

143

```

144

145

### Metrics

146

147

CloudWatch embedded metric format (EMF) utility for publishing custom metrics with high-cardinality dimensions.

148

149

```python { .api }

150

class Metrics:

151

def __init__(

152

self,

153

service: str = None,

154

namespace: str = None,

155

metadata: Dict[str, Any] = None,

156

default_dimensions: Dict[str, str] = None,

157

):

158

"""

159

Initialize Metrics with configuration.

160

161

Parameters:

162

- service: Service name for default dimensions

163

- namespace: CloudWatch namespace for metrics

164

- metadata: Default metadata to include

165

- default_dimensions: Default dimensions for all metrics

166

"""

167

168

def add_metric(

169

self,

170

name: str,

171

unit: MetricUnit,

172

value: Union[float, int],

173

resolution: MetricResolution = 60,

174

) -> None:

175

"""

176

Add a metric to be published.

177

178

Parameters:

179

- name: Metric name

180

- unit: Unit of measurement

181

- value: Metric value

182

- resolution: Metric resolution in seconds (1 or 60)

183

"""

184

185

def add_dimension(self, name: str, value: str) -> None:

186

"""Add a dimension to current metric context"""

187

188

def add_metadata(self, key: str, value: Any) -> None:

189

"""Add metadata to current metric context"""

190

191

def set_default_dimensions(self, **dimensions) -> None:

192

"""Set default dimensions for all metrics"""

193

194

def clear_metrics(self) -> None:

195

"""Clear all metrics and dimensions"""

196

197

def clear_dimensions(self) -> None:

198

"""Clear all dimensions"""

199

200

def clear_metadata(self) -> None:

201

"""Clear all metadata"""

202

203

def log_metrics(

204

self,

205

lambda_handler: Callable = None,

206

capture_cold_start_metric: bool = False,

207

raise_on_empty_metrics: bool = False,

208

default_dimensions: Dict[str, str] = None,

209

) -> Callable:

210

"""

211

Decorator to automatically publish metrics after Lambda execution.

212

213

Parameters:

214

- lambda_handler: Lambda handler function to decorate

215

- capture_cold_start_metric: Whether to capture cold start metric

216

- raise_on_empty_metrics: Whether to raise error if no metrics added

217

- default_dimensions: Default dimensions to apply

218

219

Returns:

220

Decorated function with automatic metric publishing

221

"""

222

223

class EphemeralMetrics:

224

def __init__(

225

self,

226

service: str = None,

227

namespace: str = None,

228

):

229

"""

230

Ephemeral metrics that don't persist beyond function execution.

231

232

Parameters:

233

- service: Service name for default dimensions

234

- namespace: CloudWatch namespace for metrics

235

"""

236

237

def add_metric(

238

self,

239

name: str,

240

unit: MetricUnit,

241

value: Union[float, int],

242

) -> None:

243

"""Add ephemeral metric"""

244

245

def single_metric(

246

name: str,

247

unit: MetricUnit,

248

value: Union[float, int],

249

resolution: MetricResolution = 60,

250

default_dimensions: Dict[str, str] = None,

251

namespace: str = None,

252

) -> ContextManager:

253

"""

254

Context manager for publishing a single metric.

255

256

Parameters:

257

- name: Metric name

258

- unit: Unit of measurement

259

- value: Metric value

260

- resolution: Metric resolution in seconds

261

- default_dimensions: Dimensions to apply

262

- namespace: CloudWatch namespace

263

264

Returns:

265

Context manager that publishes metric on exit

266

"""

267

```

268

269

### Tracer

270

271

AWS X-Ray tracing utility for distributed tracing across AWS services.

272

273

```python { .api }

274

class Tracer:

275

def __init__(

276

self,

277

service: str = None,

278

disabled: bool = False,

279

auto_patch: bool = True,

280

patch_modules: List[str] = None,

281

provider: BaseProvider = None,

282

):

283

"""

284

Initialize Tracer with configuration.

285

286

Parameters:

287

- service: Service name for tracing

288

- disabled: Whether tracing is disabled

289

- auto_patch: Whether to auto-patch supported libraries

290

- patch_modules: Specific modules to patch for tracing

291

- provider: Custom tracing provider

292

"""

293

294

def capture_lambda_handler(

295

self,

296

lambda_handler: Callable = None,

297

capture_response: bool = True,

298

capture_error: bool = True,

299

) -> Callable:

300

"""

301

Decorator to capture Lambda handler execution in X-Ray trace.

302

303

Parameters:

304

- lambda_handler: Lambda handler function to trace

305

- capture_response: Whether to capture response in trace

306

- capture_error: Whether to capture errors in trace

307

308

Returns:

309

Decorated function with Lambda handler tracing

310

"""

311

312

def capture_method(

313

self,

314

method: Callable = None,

315

capture_response: bool = True,

316

capture_error: bool = True,

317

) -> Callable:

318

"""

319

Decorator to capture method execution in X-Ray subsegment.

320

321

Parameters:

322

- method: Method to trace

323

- capture_response: Whether to capture response

324

- capture_error: Whether to capture errors

325

326

Returns:

327

Decorated method with tracing

328

"""

329

330

def put_annotation(self, key: str, value: Union[str, int, float, bool]) -> None:

331

"""

332

Add annotation to current trace segment.

333

334

Parameters:

335

- key: Annotation key

336

- value: Annotation value (searchable in X-Ray console)

337

"""

338

339

def put_metadata(

340

self,

341

key: str,

342

value: Any,

343

namespace: str = "default",

344

) -> None:

345

"""

346

Add metadata to current trace segment.

347

348

Parameters:

349

- key: Metadata key

350

- value: Metadata value (not searchable, for debugging)

351

- namespace: Metadata namespace

352

"""

353

354

def patch(self, modules_to_patch: List[str]) -> None:

355

"""Manually patch modules for tracing"""

356

357

def provider(self) -> BaseProvider:

358

"""Get current tracing provider"""

359

360

def is_disabled(self) -> bool:

361

"""Check if tracing is disabled"""

362

363

def aiohttp_trace_config() -> Any:

364

"""

365

Get aiohttp trace configuration for async HTTP client tracing.

366

367

Returns:

368

aiohttp TraceConfig object for automatic request tracing

369

"""

370

```

371

372

## Usage Examples

373

374

### Basic Logging with Lambda Context

375

376

```python

377

from aws_lambda_powertools import Logger

378

from aws_lambda_powertools.utilities.typing import LambdaContext

379

380

logger = Logger(service="payment-service")

381

382

@logger.inject_lambda_context(log_event=True)

383

def lambda_handler(event: dict, context: LambdaContext) -> dict:

384

logger.info("Processing payment", extra={

385

"payment_id": event.get("payment_id"),

386

"amount": event.get("amount")

387

})

388

389

try:

390

# Process payment logic

391

result = process_payment(event)

392

logger.info("Payment processed successfully", extra={"result": result})

393

return {"statusCode": 200, "body": result}

394

except Exception as e:

395

logger.exception("Payment processing failed")

396

return {"statusCode": 500, "body": "Payment failed"}

397

```

398

399

### Metrics with Custom Dimensions

400

401

```python

402

from aws_lambda_powertools import Metrics

403

from aws_lambda_powertools.metrics import MetricUnit

404

405

metrics = Metrics(service="payment-service", namespace="ECommerce")

406

407

@metrics.log_metrics(capture_cold_start_metric=True)

408

def lambda_handler(event: dict, context: LambdaContext) -> dict:

409

# Add custom dimensions

410

metrics.add_dimension("payment_method", event.get("payment_method", "unknown"))

411

metrics.add_dimension("region", event.get("region", "us-east-1"))

412

413

# Record metrics

414

metrics.add_metric(name="PaymentAttempted", unit=MetricUnit.Count, value=1)

415

416

try:

417

amount = float(event.get("amount", 0))

418

metrics.add_metric(name="PaymentAmount", unit=MetricUnit.None, value=amount)

419

420

# Process payment

421

success = process_payment(event)

422

423

if success:

424

metrics.add_metric(name="PaymentSuccessful", unit=MetricUnit.Count, value=1)

425

else:

426

metrics.add_metric(name="PaymentFailed", unit=MetricUnit.Count, value=1)

427

428

return {"statusCode": 200 if success else 400}

429

430

except Exception as e:

431

metrics.add_metric(name="PaymentError", unit=MetricUnit.Count, value=1)

432

raise

433

```

434

435

### Distributed Tracing with X-Ray

436

437

```python

438

from aws_lambda_powertools import Tracer

439

import boto3

440

441

tracer = Tracer(service="payment-service")

442

dynamodb = boto3.resource("dynamodb")

443

444

@tracer.capture_lambda_handler

445

def lambda_handler(event: dict, context: LambdaContext) -> dict:

446

tracer.put_annotation("payment_id", event.get("payment_id"))

447

tracer.put_metadata("event_details", event)

448

449

# Trace database operations

450

user_id = get_user_id(event["payment_id"])

451

452

# Trace external service calls

453

payment_result = process_external_payment(event)

454

455

return {"statusCode": 200, "result": payment_result}

456

457

@tracer.capture_method

458

def get_user_id(payment_id: str) -> str:

459

table = dynamodb.Table("payments")

460

461

# This DynamoDB call will be automatically traced

462

response = table.get_item(Key={"payment_id": payment_id})

463

464

tracer.put_annotation("user_found", "user_id" in response.get("Item", {}))

465

return response.get("Item", {}).get("user_id")

466

467

@tracer.capture_method(capture_response=False) # Don't capture sensitive response

468

def process_external_payment(event: dict) -> dict:

469

# External payment processing logic

470

tracer.put_annotation("payment_processor", "stripe")

471

472

# Simulate external API call

473

result = {"status": "success", "transaction_id": "txn_123"}

474

475

tracer.put_metadata("payment_processor_response", {

476

"status": result["status"],

477

"transaction_id": result["transaction_id"]

478

})

479

480

return result

481

```

482

483

### Combined Observability Pattern

484

485

```python

486

from aws_lambda_powertools import Logger, Metrics, Tracer

487

from aws_lambda_powertools.metrics import MetricUnit

488

from aws_lambda_powertools.utilities.typing import LambdaContext

489

490

# Initialize all observability tools

491

logger = Logger(service="order-service")

492

metrics = Metrics(service="order-service", namespace="ECommerce")

493

tracer = Tracer(service="order-service")

494

495

@logger.inject_lambda_context(correlation_id_path="headers.x-correlation-id")

496

@tracer.capture_lambda_handler

497

@metrics.log_metrics(capture_cold_start_metric=True)

498

def lambda_handler(event: dict, context: LambdaContext) -> dict:

499

# Extract order details

500

order_id = event.get("order_id")

501

customer_id = event.get("customer_id")

502

503

# Add structured logging context

504

logger.append_keys(order_id=order_id, customer_id=customer_id)

505

506

# Add tracing annotations for searchability

507

tracer.put_annotation("order_id", order_id)

508

tracer.put_annotation("customer_id", customer_id)

509

510

# Add metric dimensions

511

metrics.add_dimension("order_type", event.get("order_type", "standard"))

512

metrics.add_dimension("region", event.get("region", "us-east-1"))

513

514

logger.info("Processing order")

515

516

try:

517

# Process order logic

518

order = process_order(event)

519

520

# Record success metrics

521

metrics.add_metric(name="OrderProcessed", unit=MetricUnit.Count, value=1)

522

metrics.add_metric(name="OrderValue", unit=MetricUnit.None, value=order["total"])

523

524

logger.info("Order processed successfully", extra={"order_total": order["total"]})

525

526

return {

527

"statusCode": 200,

528

"body": {"order_id": order_id, "status": "processed"}

529

}

530

531

except ValidationError as e:

532

logger.warning("Order validation failed", extra={"error": str(e)})

533

metrics.add_metric(name="OrderValidationError", unit=MetricUnit.Count, value=1)

534

tracer.put_annotation("error_type", "validation")

535

536

return {"statusCode": 400, "body": {"error": "Invalid order"}}

537

538

except Exception as e:

539

logger.exception("Order processing failed")

540

metrics.add_metric(name="OrderProcessingError", unit=MetricUnit.Count, value=1)

541

tracer.put_annotation("error_type", "processing")

542

543

return {"statusCode": 500, "body": {"error": "Processing failed"}}

544

545

@tracer.capture_method

546

def process_order(event: dict) -> dict:

547

"""Process order with detailed tracing"""

548

tracer.put_metadata("order_details", event)

549

550

# Validate order

551

validate_order(event)

552

553

# Calculate totals

554

total = calculate_order_total(event)

555

556

# Save to database

557

save_order(event, total)

558

559

return {"order_id": event["order_id"], "total": total}

560

```

561

562

## Types

563

564

```python { .api }

565

from typing import Union, Dict, Any, List, Callable, TextIO, ContextManager

566

from logging import Handler

567

568

# Metric types

569

MetricUnit = Literal[

570

"Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes",

571

"Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits",

572

"Megabits", "Gigabits", "Terabits", "Percent", "Count",

573

"Bytes/Second", "Kilobytes/Second", "Megabytes/Second",

574

"Gigabytes/Second", "Terabytes/Second", "Bits/Second",

575

"Kilobits/Second", "Megabits/Second", "Gigabits/Second",

576

"Terabits/Second", "Count/Second", "None"

577

]

578

579

MetricResolution = Literal[1, 60]

580

581

# Exception types

582

class MetricUnitError(Exception):

583

"""Raised when an invalid metric unit is used"""

584

pass

585

586

class MetricResolutionError(Exception):

587

"""Raised when an invalid metric resolution is used"""

588

pass

589

590

class MetricValueError(Exception):

591

"""Raised when an invalid metric value is provided"""

592

pass

593

594

class SchemaValidationError(Exception):

595

"""Raised when metric schema validation fails"""

596

pass

597

598

# Formatter types

599

class PowertoolsFormatter:

600

"""Custom log formatter for Powertools"""

601

def __init__(

602

self,

603

json_serializer: Callable[[Dict], str] = None,

604

json_deserializer: Callable = None,

605

json_default: Callable[[Any], Any] = None,

606

datefmt: str = None,

607

use_datetime_directive: bool = False,

608

log_record_order: List[str] = None,

609

utc: bool = False,

610

use_rfc3339: bool = False,

611

): ...

612

613

# Provider types for tracing

614

class BaseProvider:

615

"""Base tracing provider interface"""

616

pass

617

```