or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdcore-observability.mddata-classes.mdevent-handlers.mdfeature-flags.mdindex.mdparameters.mdparser.mdutilities.md

feature-flags.mddocs/

0

# Feature Flags & Idempotency

1

2

Feature flags with rule engine support from AWS AppConfig and idempotency patterns to prevent duplicate processing of events. Enables dynamic feature control and reliable event processing in serverless applications.

3

4

## Capabilities

5

6

### Feature Flags

7

8

Dynamic feature flag management with rule-based evaluation from AWS AppConfig.

9

10

```python { .api }

11

class FeatureFlags:

12

def __init__(

13

self,

14

store: StoreProvider,

15

logger: Logger = None,

16

):

17

"""

18

Initialize feature flags with store provider.

19

20

Parameters:

21

- store: Store provider (usually AppConfigStore)

22

- logger: Optional logger for feature flag events

23

"""

24

25

def evaluate(

26

self,

27

name: str,

28

context: Dict[str, Any] = None,

29

default: Any = False,

30

) -> bool | Any:

31

"""

32

Evaluate feature flag with optional context.

33

34

Parameters:

35

- name: Feature flag name

36

- context: Evaluation context (user ID, region, etc.)

37

- default: Default value if flag not found

38

39

Returns:

40

Feature flag value (boolean or complex value)

41

"""

42

43

def get_enabled_features(self, context: Dict[str, Any] = None) -> Dict[str, Any]:

44

"""

45

Get all enabled features for given context.

46

47

Parameters:

48

- context: Evaluation context

49

50

Returns:

51

Dictionary of enabled feature names to values

52

"""

53

54

def batch_evaluate(

55

self,

56

flags: List[str],

57

context: Dict[str, Any] = None,

58

default: Any = False,

59

) -> Dict[str, Any]:

60

"""

61

Evaluate multiple feature flags in batch.

62

63

Parameters:

64

- flags: List of feature flag names

65

- context: Evaluation context

66

- default: Default value for missing flags

67

68

Returns:

69

Dictionary mapping flag names to values

70

"""

71

72

class AppConfigStore(StoreProvider):

73

def __init__(

74

self,

75

environment: str,

76

application: str,

77

name: str,

78

max_age: int = 5,

79

sdk_config: Dict[str, Any] = None,

80

envelope: str = None,

81

jmespath_options: Dict[str, Any] = None,

82

logger: Logger = None,

83

):

84

"""

85

AWS AppConfig store provider for feature flags.

86

87

Parameters:

88

- environment: AppConfig environment name

89

- application: AppConfig application name

90

- name: AppConfig configuration profile name

91

- max_age: Cache TTL in seconds

92

- sdk_config: Boto3 client configuration

93

- envelope: JMESPath envelope for data extraction

94

- jmespath_options: JMESPath evaluation options

95

- logger: Optional logger instance

96

"""

97

98

def get_configuration(self) -> Dict[str, Any]:

99

"""

100

Get configuration from AppConfig.

101

102

Returns:

103

Complete configuration dictionary

104

"""

105

106

def _get_flag_value(

107

self,

108

name: str,

109

context: Dict[str, Any] = None,

110

default: Any = None,

111

) -> Any:

112

"""Get individual flag value with rule evaluation"""

113

114

class StoreProvider:

115

"""Base store provider interface for feature flags"""

116

117

def get_configuration(self) -> Dict[str, Any]:

118

"""Get complete configuration"""

119

raise NotImplementedError

120

121

def get(

122

self,

123

name: str,

124

context: Dict[str, Any] = None,

125

default: Any = None,

126

) -> Any:

127

"""Get configuration value by name"""

128

raise NotImplementedError

129

```

130

131

### Rule Engine

132

133

Rule-based feature flag evaluation with advanced targeting capabilities.

134

135

```python { .api }

136

class RuleAction:

137

ALLOW = "ALLOW"

138

DENY = "DENY"

139

140

class SchemaValidator:

141

def __init__(self, schema: Dict[str, Any]):

142

"""

143

Initialize schema validator for feature flag configuration.

144

145

Parameters:

146

- schema: JSON schema for configuration validation

147

"""

148

149

def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:

150

"""

151

Validate configuration data against schema.

152

153

Parameters:

154

- data: Configuration data to validate

155

156

Returns:

157

Validated configuration data

158

159

Raises:

160

SchemaValidationError: If validation fails

161

"""

162

```

163

164

### Idempotency

165

166

Idempotency patterns to ensure Lambda functions process events exactly once.

167

168

```python { .api }

169

def idempotent(

170

persistence_store: BasePersistenceLayer,

171

config: IdempotencyConfig = None,

172

) -> Callable:

173

"""

174

Decorator for making Lambda handler idempotent.

175

176

Parameters:

177

- persistence_store: Storage layer for idempotency keys

178

- config: Idempotency configuration options

179

180

Returns:

181

Decorated function that prevents duplicate processing

182

"""

183

184

def idempotent_function(

185

data_keyword_argument: str,

186

persistence_store: BasePersistenceLayer,

187

config: IdempotencyConfig = None,

188

) -> Callable:

189

"""

190

Decorator for making specific function calls idempotent.

191

192

Parameters:

193

- data_keyword_argument: Keyword argument name containing data for key generation

194

- persistence_store: Storage layer for idempotency keys

195

- config: Idempotency configuration options

196

197

Returns:

198

Decorated function with idempotency guarantees

199

"""

200

201

class IdempotencyConfig:

202

def __init__(

203

self,

204

event_key_jmespath: str = None,

205

payload_validation_jmespath: str = None,

206

raise_on_no_idempotency_key: bool = False,

207

expires_after_seconds: int = 3600,

208

use_local_cache: bool = False,

209

local_cache_max_items: int = 256,

210

hash_function: str = "md5",

211

lambda_context: LambdaContext = None,

212

):

213

"""

214

Configuration for idempotency behavior.

215

216

Parameters:

217

- event_key_jmespath: JMESPath to extract idempotency key from event

218

- payload_validation_jmespath: JMESPath to extract payload for validation

219

- raise_on_no_idempotency_key: Whether to raise error if no key found

220

- expires_after_seconds: TTL for idempotency records

221

- use_local_cache: Whether to use in-memory cache

222

- local_cache_max_items: Maximum items in local cache

223

- hash_function: Hash function for key generation (md5, sha1, sha256)

224

- lambda_context: Lambda context for additional key generation

225

"""

226

227

class BasePersistenceLayer:

228

"""Base persistence layer for idempotency records"""

229

230

def __init__(

231

self,

232

table_name: str,

233

key_attr: str = "id",

234

expiry_attr: str = "expiration",

235

status_attr: str = "status",

236

data_attr: str = "data",

237

validation_key_attr: str = "validation",

238

):

239

"""

240

Initialize persistence layer.

241

242

Parameters:

243

- table_name: Storage table/collection name

244

- key_attr: Attribute name for idempotency key

245

- expiry_attr: Attribute name for expiration timestamp

246

- status_attr: Attribute name for processing status

247

- data_attr: Attribute name for stored result

248

- validation_key_attr: Attribute name for payload validation

249

"""

250

251

def save_inprogress(

252

self,

253

idempotency_key: str,

254

remaining_time_in_millis: int = None,

255

) -> None:

256

"""

257

Save in-progress idempotency record.

258

259

Parameters:

260

- idempotency_key: Unique idempotency key

261

- remaining_time_in_millis: Remaining Lambda execution time

262

"""

263

264

def save_success(

265

self,

266

idempotency_key: str,

267

result: Any,

268

remaining_time_in_millis: int = None,

269

) -> None:

270

"""

271

Save successful processing result.

272

273

Parameters:

274

- idempotency_key: Unique idempotency key

275

- result: Function result to store

276

- remaining_time_in_millis: Remaining Lambda execution time

277

"""

278

279

def get_record(self, idempotency_key: str) -> Dict[str, Any] | None:

280

"""

281

Retrieve idempotency record by key.

282

283

Parameters:

284

- idempotency_key: Idempotency key to lookup

285

286

Returns:

287

Stored record or None if not found

288

"""

289

290

def delete_record(self, idempotency_key: str) -> None:

291

"""

292

Delete idempotency record.

293

294

Parameters:

295

- idempotency_key: Key of record to delete

296

"""

297

298

class DynamoDBPersistenceLayer(BasePersistenceLayer):

299

def __init__(

300

self,

301

table_name: str,

302

key_attr: str = "id",

303

expiry_attr: str = "expiration",

304

status_attr: str = "status",

305

data_attr: str = "data",

306

validation_key_attr: str = "validation",

307

boto_config: Dict[str, Any] = None,

308

boto3_session: boto3.Session = None,

309

):

310

"""

311

DynamoDB persistence layer for idempotency.

312

313

Parameters:

314

- table_name: DynamoDB table name

315

- key_attr: Primary key attribute name

316

- expiry_attr: TTL attribute name

317

- status_attr: Status attribute name

318

- data_attr: Data attribute name

319

- validation_key_attr: Validation key attribute name

320

- boto_config: Boto3 client configuration

321

- boto3_session: Boto3 session for authentication

322

"""

323

324

class IdempotentHookFunction:

325

"""Hook function interface for idempotency events"""

326

327

def __call__(

328

self,

329

idempotency_key: str,

330

result: Any = None,

331

exception: Exception = None,

332

) -> None:

333

"""

334

Hook function called during idempotency processing.

335

336

Parameters:

337

- idempotency_key: The idempotency key

338

- result: Function result (if successful)

339

- exception: Exception (if failed)

340

"""

341

```

342

343

## Usage Examples

344

345

### Basic Feature Flags

346

347

```python

348

from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore

349

from aws_lambda_powertools.utilities.typing import LambdaContext

350

import os

351

352

# Initialize feature flags with AppConfig

353

store = AppConfigStore(

354

environment=os.environ["ENVIRONMENT"],

355

application=os.environ["APP_NAME"],

356

name="feature-flags"

357

)

358

359

feature_flags = FeatureFlags(store=store)

360

361

def lambda_handler(event: dict, context: LambdaContext) -> dict:

362

# Simple boolean flag

363

if feature_flags.evaluate(name="enable_new_algorithm", default=False):

364

result = new_algorithm_processing(event)

365

else:

366

result = legacy_algorithm_processing(event)

367

368

# Feature flag with context

369

user_id = event.get("user_id")

370

user_context = {"userId": user_id, "region": event.get("region", "us-east-1")}

371

372

if feature_flags.evaluate(name="enable_premium_features", context=user_context, default=False):

373

result["premium_data"] = get_premium_data(user_id)

374

375

# Complex feature flag with configuration

376

recommendation_config = feature_flags.evaluate(

377

name="recommendation_settings",

378

context=user_context,

379

default={"enabled": False, "algorithm": "basic", "limit": 5}

380

)

381

382

if recommendation_config.get("enabled", False):

383

recommendations = get_recommendations(

384

user_id=user_id,

385

algorithm=recommendation_config.get("algorithm", "basic"),

386

limit=recommendation_config.get("limit", 5)

387

)

388

result["recommendations"] = recommendations

389

390

return {

391

"statusCode": 200,

392

"body": result

393

}

394

395

def new_algorithm_processing(event: dict) -> dict:

396

"""New algorithm implementation"""

397

return {"algorithm": "v2", "result": "processed_with_new_logic"}

398

399

def legacy_algorithm_processing(event: dict) -> dict:

400

"""Legacy algorithm implementation"""

401

return {"algorithm": "v1", "result": "processed_with_legacy_logic"}

402

```

403

404

### Advanced Feature Flags with Rules

405

406

```python

407

from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore

408

from aws_lambda_powertools.utilities.typing import LambdaContext

409

from aws_lambda_powertools import Logger

410

import os

411

412

logger = Logger()

413

414

# AppConfig feature flag configuration example:

415

# {

416

# "feature_flags": {

417

# "enable_beta_features": {

418

# "enabled": true,

419

# "rules": {

420

# "allow_beta_users": {

421

# "when_match": [

422

# {

423

# "action": "ALLOW",

424

# "conditions": [

425

# {

426

# "key": "userType",

427

# "value": "beta",

428

# "condition": "EQUALS"

429

# }

430

# ]

431

# }

432

# ],

433

# "when_no_match": {

434

# "action": "DENY"

435

# }

436

# }

437

# },

438

# "default": false

439

# },

440

# "api_rate_limit": {

441

# "enabled": true,

442

# "rules": {

443

# "premium_rate_limit": {

444

# "when_match": [

445

# {

446

# "action": "ALLOW",

447

# "conditions": [

448

# {

449

# "key": "userTier",

450

# "value": "premium",

451

# "condition": "EQUALS"

452

# }

453

# ]

454

# }

455

# ],

456

# "when_no_match": {

457

# "action": "DENY"

458

# }

459

# }

460

# },

461

# "default": {"requests_per_minute": 100}

462

# }

463

# }

464

# }

465

466

store = AppConfigStore(

467

environment=os.environ["ENVIRONMENT"],

468

application="my-app",

469

name="feature-config"

470

)

471

472

feature_flags = FeatureFlags(store=store, logger=logger)

473

474

def lambda_handler(event: dict, context: LambdaContext) -> dict:

475

# Extract user context from event

476

user_context = {

477

"userId": event.get("user_id"),

478

"userType": event.get("user_type", "standard"),

479

"userTier": event.get("user_tier", "basic"),

480

"region": event.get("region", "us-east-1"),

481

"deviceType": event.get("device_type", "web")

482

}

483

484

logger.append_keys(user_context=user_context)

485

486

# Evaluate feature flags with context

487

results = {}

488

489

# Beta features access

490

beta_enabled = feature_flags.evaluate(

491

name="enable_beta_features",

492

context=user_context,

493

default=False

494

)

495

496

if beta_enabled:

497

logger.info("Beta features enabled for user")

498

results["beta_features"] = get_beta_features(user_context)

499

else:

500

logger.info("Beta features not available for user")

501

502

# Dynamic rate limiting

503

rate_limit_config = feature_flags.evaluate(

504

name="api_rate_limit",

505

context=user_context,

506

default={"requests_per_minute": 60}

507

)

508

509

# Apply rate limiting

510

rate_limit = rate_limit_config.get("requests_per_minute", 60)

511

if not check_rate_limit(user_context["userId"], rate_limit):

512

return {

513

"statusCode": 429,

514

"body": {"error": "Rate limit exceeded"}

515

}

516

517

# Batch evaluate multiple flags

518

flag_results = feature_flags.batch_evaluate(

519

flags=["enable_analytics", "enable_notifications", "enable_cache"],

520

context=user_context,

521

default=False

522

)

523

524

# Process based on enabled features

525

if flag_results.get("enable_analytics", False):

526

track_user_event(event, user_context)

527

528

if flag_results.get("enable_notifications", False):

529

queue_notification(user_context["userId"], "feature_accessed")

530

531

# Use caching if enabled

532

cache_enabled = flag_results.get("enable_cache", False)

533

if cache_enabled:

534

results["cached_data"] = get_cached_data(user_context["userId"])

535

else:

536

results["fresh_data"] = get_fresh_data(user_context["userId"])

537

538

return {

539

"statusCode": 200,

540

"body": results,

541

"rate_limit": rate_limit

542

}

543

544

def get_beta_features(user_context: dict) -> dict:

545

"""Get beta features for user"""

546

return {

547

"new_dashboard": True,

548

"advanced_analytics": True,

549

"ai_recommendations": True

550

}

551

552

def check_rate_limit(user_id: str, limit: int) -> bool:

553

"""Check if user is within rate limit"""

554

# Implementation would check against cache/database

555

return True # Simplified for example

556

557

def track_user_event(event: dict, user_context: dict):

558

"""Track analytics event"""

559

logger.info("Tracking user event", extra={"event_type": "api_access"})

560

561

def queue_notification(user_id: str, notification_type: str):

562

"""Queue notification for user"""

563

logger.info("Queuing notification", extra={

564

"user_id": user_id,

565

"type": notification_type

566

})

567

```

568

569

### Basic Idempotency

570

571

```python

572

from aws_lambda_powertools.utilities.idempotency import (

573

idempotent,

574

DynamoDBPersistenceLayer,

575

IdempotencyConfig

576

)

577

from aws_lambda_powertools.utilities.typing import LambdaContext

578

from aws_lambda_powertools import Logger

579

import json

580

581

logger = Logger()

582

583

# Configure DynamoDB persistence layer

584

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

585

586

# Configure idempotency behavior

587

config = IdempotencyConfig(

588

event_key_jmespath="requestId", # Extract requestId from event

589

expires_after_seconds=3600, # 1 hour TTL

590

use_local_cache=True, # Enable local caching

591

)

592

593

@idempotent(persistence_store=persistence_layer, config=config)

594

def lambda_handler(event: dict, context: LambdaContext) -> dict:

595

"""

596

Idempotent Lambda handler - duplicate requests return cached result

597

"""

598

599

request_id = event.get("requestId")

600

logger.info(f"Processing request {request_id}")

601

602

# This logic will only execute once per unique requestId

603

user_id = event.get("userId")

604

order_data = event.get("orderData", {})

605

606

# Simulate expensive operation

607

order_id = process_order(user_id, order_data)

608

609

# Send confirmation (will only happen once)

610

send_order_confirmation(user_id, order_id)

611

612

result = {

613

"orderId": order_id,

614

"status": "processed",

615

"timestamp": context.aws_request_id

616

}

617

618

logger.info(f"Order processed: {order_id}")

619

620

return {

621

"statusCode": 200,

622

"body": json.dumps(result)

623

}

624

625

def process_order(user_id: str, order_data: dict) -> str:

626

"""Process order (expensive operation)"""

627

import uuid

628

import time

629

630

# Simulate processing time

631

time.sleep(1)

632

633

order_id = str(uuid.uuid4())

634

logger.info(f"Created order {order_id} for user {user_id}")

635

636

return order_id

637

638

def send_order_confirmation(user_id: str, order_id: str):

639

"""Send order confirmation (side effect)"""

640

logger.info(f"Sending confirmation for order {order_id} to user {user_id}")

641

# Email/SMS sending logic here

642

```

643

644

### Advanced Idempotency with Custom Keys

645

646

```python

647

from aws_lambda_powertools.utilities.idempotency import (

648

idempotent,

649

idempotent_function,

650

DynamoDBPersistenceLayer,

651

IdempotencyConfig

652

)

653

from aws_lambda_powertools.utilities.typing import LambdaContext

654

from aws_lambda_powertools import Logger

655

import hashlib

656

import json

657

658

logger = Logger()

659

660

# Persistence layer with custom configuration

661

persistence_layer = DynamoDBPersistenceLayer(

662

table_name="IdempotencyTable",

663

key_attr="idempotency_key",

664

expiry_attr="expires_at",

665

status_attr="status",

666

data_attr="result_data"

667

)

668

669

# Complex idempotency configuration

670

config = IdempotencyConfig(

671

event_key_jmespath="headers.`Idempotency-Key` || body.transactionId",

672

payload_validation_jmespath="body", # Validate entire body for changes

673

expires_after_seconds=86400, # 24 hours

674

use_local_cache=True,

675

hash_function="sha256",

676

raise_on_no_idempotency_key=True

677

)

678

679

@idempotent(persistence_store=persistence_layer, config=config)

680

def lambda_handler(event: dict, context: LambdaContext) -> dict:

681

"""Handler with custom idempotency key extraction"""

682

683

# Process payment request

684

payment_data = event.get("body", {})

685

headers = event.get("headers", {})

686

687

idempotency_key = headers.get("Idempotency-Key") or payment_data.get("transactionId")

688

logger.append_keys(idempotency_key=idempotency_key)

689

690

logger.info("Processing payment request")

691

692

# This will only execute once per unique key

693

result = process_payment_request(payment_data)

694

695

return {

696

"statusCode": 200,

697

"body": json.dumps(result)

698

}

699

700

# Separate persistence layer for user operations

701

user_persistence = DynamoDBPersistenceLayer(table_name="UserIdempotencyTable")

702

703

user_config = IdempotencyConfig(

704

expires_after_seconds=3600,

705

use_local_cache=True

706

)

707

708

@idempotent_function(

709

data_keyword_argument="user_data",

710

persistence_store=user_persistence,

711

config=user_config

712

)

713

def create_user(user_data: dict) -> dict:

714

"""Idempotent user creation function"""

715

716

logger.info("Creating new user", extra={"email": user_data.get("email")})

717

718

# Check if user already exists

719

if user_exists(user_data["email"]):

720

raise ValueError("User already exists")

721

722

# Create user (expensive operation)

723

user_id = generate_user_id()

724

save_user_to_database(user_id, user_data)

725

send_welcome_email(user_data["email"])

726

727

return {

728

"user_id": user_id,

729

"email": user_data["email"],

730

"created_at": "2024-01-01T00:00:00Z"

731

}

732

733

def process_payment_request(payment_data: dict) -> dict:

734

"""Process payment with external API"""

735

736

amount = payment_data["amount"]

737

currency = payment_data["currency"]

738

payment_method = payment_data["payment_method"]

739

740

# Call external payment processor

741

transaction_id = call_payment_processor(amount, currency, payment_method)

742

743

# Update internal records

744

update_payment_records(transaction_id, payment_data)

745

746

return {

747

"transaction_id": transaction_id,

748

"amount": amount,

749

"currency": currency,

750

"status": "completed"

751

}

752

753

# Bulk operations with per-item idempotency

754

@idempotent_function(

755

data_keyword_argument="item_data",

756

persistence_store=persistence_layer,

757

config=IdempotencyConfig(expires_after_seconds=1800)

758

)

759

def process_bulk_item(item_data: dict) -> dict:

760

"""Process individual item in bulk operation"""

761

762

item_id = item_data["item_id"]

763

logger.info(f"Processing item {item_id}")

764

765

# Expensive per-item processing

766

result = expensive_item_processing(item_data)

767

768

return {

769

"item_id": item_id,

770

"processed": True,

771

"result": result

772

}

773

774

def bulk_handler(event: dict, context: LambdaContext) -> dict:

775

"""Handle bulk operations with per-item idempotency"""

776

777

items = event.get("items", [])

778

results = []

779

780

for item in items:

781

try:

782

# Each item is processed idempotently

783

result = process_bulk_item(item_data=item)

784

results.append(result)

785

786

except Exception as e:

787

logger.exception(f"Failed to process item {item.get('item_id')}")

788

results.append({

789

"item_id": item.get("item_id"),

790

"error": str(e)

791

})

792

793

return {

794

"statusCode": 200,

795

"body": json.dumps({

796

"processed_count": len([r for r in results if not r.get("error")]),

797

"failed_count": len([r for r in results if r.get("error")]),

798

"results": results

799

})

800

}

801

```

802

803

### Error Handling and Monitoring

804

805

```python

806

from aws_lambda_powertools.utilities.feature_flags import (

807

FeatureFlags,

808

AppConfigStore,

809

ConfigurationStoreError

810

)

811

from aws_lambda_powertools.utilities.idempotency import (

812

idempotent,

813

DynamoDBPersistenceLayer,

814

IdempotencyConfig,

815

IdempotencyAlreadyInProgressError,

816

IdempotencyInconsistentStateError,

817

IdempotencyKeyError

818

)

819

from aws_lambda_powertools.utilities.typing import LambdaContext

820

from aws_lambda_powertools import Logger, Metrics

821

from aws_lambda_powertools.metrics import MetricUnit

822

823

logger = Logger()

824

metrics = Metrics()

825

826

# Feature flags with error handling

827

try:

828

store = AppConfigStore(

829

environment="production",

830

application="my-app",

831

name="feature-flags",

832

max_age=30 # Refresh every 30 seconds

833

)

834

feature_flags = FeatureFlags(store=store, logger=logger)

835

except ConfigurationStoreError as e:

836

logger.error("Failed to initialize feature flags", extra={"error": str(e)})

837

# Fallback to default configuration

838

feature_flags = None

839

840

# Idempotency with error handling

841

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

842

843

config = IdempotencyConfig(

844

event_key_jmespath="requestId",

845

expires_after_seconds=3600,

846

use_local_cache=True

847

)

848

849

@metrics.log_metrics(capture_cold_start_metric=True)

850

@idempotent(persistence_store=persistence_layer, config=config)

851

def lambda_handler(event: dict, context: LambdaContext) -> dict:

852

"""Handler with comprehensive error handling"""

853

854

request_id = event.get("requestId")

855

logger.append_keys(request_id=request_id)

856

857

try:

858

# Feature flag evaluation with fallbacks

859

features = get_enabled_features(event)

860

861

# Record feature usage metrics

862

for feature_name, enabled in features.items():

863

metrics.add_metric(

864

name=f"FeatureFlag_{feature_name}",

865

unit=MetricUnit.Count,

866

value=1 if enabled else 0

867

)

868

869

# Process request based on enabled features

870

result = process_request(event, features)

871

872

# Record success metrics

873

metrics.add_metric(name="RequestProcessed", unit=MetricUnit.Count, value=1)

874

875

return {

876

"statusCode": 200,

877

"body": result

878

}

879

880

except IdempotencyAlreadyInProgressError:

881

# Request is already being processed by another instance

882

logger.warning("Request already in progress")

883

metrics.add_metric(name="IdempotencyInProgress", unit=MetricUnit.Count, value=1)

884

885

return {

886

"statusCode": 409,

887

"body": {"error": "Request already in progress"}

888

}

889

890

except IdempotencyKeyError as e:

891

# Missing or invalid idempotency key

892

logger.error("Invalid idempotency key", extra={"error": str(e)})

893

metrics.add_metric(name="IdempotencyKeyError", unit=MetricUnit.Count, value=1)

894

895

return {

896

"statusCode": 400,

897

"body": {"error": "Invalid or missing idempotency key"}

898

}

899

900

except Exception as e:

901

# Unexpected error

902

logger.exception("Request processing failed")

903

metrics.add_metric(name="RequestFailed", unit=MetricUnit.Count, value=1)

904

905

return {

906

"statusCode": 500,

907

"body": {"error": "Internal server error"}

908

}

909

910

def get_enabled_features(event: dict) -> dict:

911

"""Get enabled features with fallback logic"""

912

913

features = {

914

"enhanced_processing": False,

915

"premium_features": False,

916

"beta_algorithms": False

917

}

918

919

if feature_flags is None:

920

logger.warning("Feature flags unavailable, using defaults")

921

return features

922

923

try:

924

user_context = {

925

"userId": event.get("userId"),

926

"userTier": event.get("userTier", "basic"),

927

"region": event.get("region", "us-east-1")

928

}

929

930

# Evaluate each feature flag with error handling

931

for feature_name in features:

932

try:

933

features[feature_name] = feature_flags.evaluate(

934

name=feature_name,

935

context=user_context,

936

default=features[feature_name]

937

)

938

except Exception as e:

939

logger.warning(

940

f"Failed to evaluate feature flag {feature_name}",

941

extra={"error": str(e)}

942

)

943

# Keep default value

944

945

return features

946

947

except ConfigurationStoreError as e:

948

logger.warning("Feature flag evaluation failed, using defaults", extra={"error": str(e)})

949

metrics.add_metric(name="FeatureFlagError", unit=MetricUnit.Count, value=1)

950

return features

951

952

def process_request(event: dict, features: dict) -> dict:

953

"""Process request based on enabled features"""

954

955

result = {"processed_features": []}

956

957

if features.get("enhanced_processing", False):

958

logger.info("Using enhanced processing")

959

result["enhanced_data"] = enhanced_processing(event)

960

result["processed_features"].append("enhanced_processing")

961

962

if features.get("premium_features", False):

963

logger.info("Including premium features")

964

result["premium_data"] = get_premium_data(event)

965

result["processed_features"].append("premium_features")

966

967

if features.get("beta_algorithms", False):

968

logger.info("Using beta algorithms")

969

result["beta_results"] = beta_algorithm_processing(event)

970

result["processed_features"].append("beta_algorithms")

971

972

# Base processing

973

result["base_data"] = base_processing(event)

974

975

return result

976

977

def enhanced_processing(event: dict) -> dict:

978

"""Enhanced processing implementation"""

979

return {"type": "enhanced", "quality": "high"}

980

981

def get_premium_data(event: dict) -> dict:

982

"""Premium feature data"""

983

return {"premium_insights": True, "advanced_analytics": True}

984

985

def beta_algorithm_processing(event: dict) -> dict:

986

"""Beta algorithm processing"""

987

return {"algorithm": "beta_v2", "accuracy": "experimental"}

988

989

def base_processing(event: dict) -> dict:

990

"""Base processing implementation"""

991

return {"type": "standard", "quality": "normal"}

992

```

993

994

## Types

995

996

```python { .api }

997

from typing import Dict, Any, List, Union, Optional, Callable

998

from aws_lambda_powertools.utilities.typing import LambdaContext

999

1000

# Feature flag types

1001

FeatureFlagValue = Union[bool, str, int, float, Dict[str, Any], List[Any]]

1002

FeatureFlagContext = Dict[str, Any]

1003

FeatureFlagEvaluation = Dict[str, FeatureFlagValue]

1004

1005

# Rule action constants

1006

RuleAction = Literal["ALLOW", "DENY"]

1007

1008

# Store provider types

1009

class StoreProvider:

1010

"""Base interface for feature flag store providers"""

1011

pass

1012

1013

# Configuration store error

1014

class ConfigurationStoreError(Exception):

1015

"""Raised when configuration store operations fail"""

1016

pass

1017

1018

# Schema validation error

1019

class SchemaValidationError(Exception):

1020

"""Raised when feature flag schema validation fails"""

1021

pass

1022

1023

# Idempotency types

1024

IdempotencyKey = str

1025

IdempotencyRecord = Dict[str, Any]

1026

1027

# Idempotency exceptions

1028

class IdempotencyError(Exception):

1029

"""Base idempotency error"""

1030

pass

1031

1032

class IdempotencyKeyError(IdempotencyError):

1033

"""Raised when idempotency key is missing or invalid"""

1034

pass

1035

1036

class IdempotencyAlreadyInProgressError(IdempotencyError):

1037

"""Raised when request is already being processed"""

1038

pass

1039

1040

class IdempotencyInconsistentStateError(IdempotencyError):

1041

"""Raised when idempotency state is inconsistent"""

1042

pass

1043

1044

class IdempotencyValidationError(IdempotencyError):

1045

"""Raised when payload validation fails"""

1046

pass

1047

1048

# Persistence layer types

1049

PersistenceRecord = Dict[str, Any]

1050

1051

# Hook function type

1052

IdempotencyHook = Callable[[str, Any, Optional[Exception]], None]

1053

1054

# Hash function type

1055

HashFunction = Literal["md5", "sha1", "sha256", "sha512"]

1056

1057

# JMESPath expression type

1058

JMESPathExpression = str

1059

1060

# Boto3 configuration types

1061

Boto3Config = Dict[str, Any]

1062

Boto3Session = Any # boto3.Session

1063

```