0
# Feature Flags & Idempotency
1
2
Feature flags with rule engine support from AWS AppConfig and idempotency patterns to prevent duplicate processing of events. Enables dynamic feature control and reliable event processing in serverless applications.
3
4
## Capabilities
5
6
### Feature Flags
7
8
Dynamic feature flag management with rule-based evaluation from AWS AppConfig.
9
10
```python { .api }
11
class FeatureFlags:
12
def __init__(
13
self,
14
store: StoreProvider,
15
logger: Logger = None,
16
):
17
"""
18
Initialize feature flags with store provider.
19
20
Parameters:
21
- store: Store provider (usually AppConfigStore)
22
- logger: Optional logger for feature flag events
23
"""
24
25
def evaluate(
26
self,
27
name: str,
28
context: Dict[str, Any] = None,
29
default: Any = False,
30
) -> bool | Any:
31
"""
32
Evaluate feature flag with optional context.
33
34
Parameters:
35
- name: Feature flag name
36
- context: Evaluation context (user ID, region, etc.)
37
- default: Default value if flag not found
38
39
Returns:
40
Feature flag value (boolean or complex value)
41
"""
42
43
def get_enabled_features(self, context: Dict[str, Any] = None) -> Dict[str, Any]:
44
"""
45
Get all enabled features for given context.
46
47
Parameters:
48
- context: Evaluation context
49
50
Returns:
51
Dictionary of enabled feature names to values
52
"""
53
54
def batch_evaluate(
55
self,
56
flags: List[str],
57
context: Dict[str, Any] = None,
58
default: Any = False,
59
) -> Dict[str, Any]:
60
"""
61
Evaluate multiple feature flags in batch.
62
63
Parameters:
64
- flags: List of feature flag names
65
- context: Evaluation context
66
- default: Default value for missing flags
67
68
Returns:
69
Dictionary mapping flag names to values
70
"""
71
72
class AppConfigStore(StoreProvider):
73
def __init__(
74
self,
75
environment: str,
76
application: str,
77
name: str,
78
max_age: int = 5,
79
sdk_config: Dict[str, Any] = None,
80
envelope: str = None,
81
jmespath_options: Dict[str, Any] = None,
82
logger: Logger = None,
83
):
84
"""
85
AWS AppConfig store provider for feature flags.
86
87
Parameters:
88
- environment: AppConfig environment name
89
- application: AppConfig application name
90
- name: AppConfig configuration profile name
91
- max_age: Cache TTL in seconds
92
- sdk_config: Boto3 client configuration
93
- envelope: JMESPath envelope for data extraction
94
- jmespath_options: JMESPath evaluation options
95
- logger: Optional logger instance
96
"""
97
98
def get_configuration(self) -> Dict[str, Any]:
99
"""
100
Get configuration from AppConfig.
101
102
Returns:
103
Complete configuration dictionary
104
"""
105
106
def _get_flag_value(
107
self,
108
name: str,
109
context: Dict[str, Any] = None,
110
default: Any = None,
111
) -> Any:
112
"""Get individual flag value with rule evaluation"""
113
114
class StoreProvider:
115
"""Base store provider interface for feature flags"""
116
117
def get_configuration(self) -> Dict[str, Any]:
118
"""Get complete configuration"""
119
raise NotImplementedError
120
121
def get(
122
self,
123
name: str,
124
context: Dict[str, Any] = None,
125
default: Any = None,
126
) -> Any:
127
"""Get configuration value by name"""
128
raise NotImplementedError
129
```
130
131
### Rule Engine
132
133
Rule-based feature flag evaluation with advanced targeting capabilities.
134
135
```python { .api }
136
class RuleAction:
137
ALLOW = "ALLOW"
138
DENY = "DENY"
139
140
class SchemaValidator:
141
def __init__(self, schema: Dict[str, Any]):
142
"""
143
Initialize schema validator for feature flag configuration.
144
145
Parameters:
146
- schema: JSON schema for configuration validation
147
"""
148
149
def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
150
"""
151
Validate configuration data against schema.
152
153
Parameters:
154
- data: Configuration data to validate
155
156
Returns:
157
Validated configuration data
158
159
Raises:
160
SchemaValidationError: If validation fails
161
"""
162
```
163
164
### Idempotency
165
166
Idempotency patterns to ensure Lambda functions process events exactly once.
167
168
```python { .api }
169
def idempotent(
170
persistence_store: BasePersistenceLayer,
171
config: IdempotencyConfig = None,
172
) -> Callable:
173
"""
174
Decorator for making Lambda handler idempotent.
175
176
Parameters:
177
- persistence_store: Storage layer for idempotency keys
178
- config: Idempotency configuration options
179
180
Returns:
181
Decorated function that prevents duplicate processing
182
"""
183
184
def idempotent_function(
185
data_keyword_argument: str,
186
persistence_store: BasePersistenceLayer,
187
config: IdempotencyConfig = None,
188
) -> Callable:
189
"""
190
Decorator for making specific function calls idempotent.
191
192
Parameters:
193
- data_keyword_argument: Keyword argument name containing data for key generation
194
- persistence_store: Storage layer for idempotency keys
195
- config: Idempotency configuration options
196
197
Returns:
198
Decorated function with idempotency guarantees
199
"""
200
201
class IdempotencyConfig:
202
def __init__(
203
self,
204
event_key_jmespath: str = None,
205
payload_validation_jmespath: str = None,
206
raise_on_no_idempotency_key: bool = False,
207
expires_after_seconds: int = 3600,
208
use_local_cache: bool = False,
209
local_cache_max_items: int = 256,
210
hash_function: str = "md5",
211
lambda_context: LambdaContext = None,
212
):
213
"""
214
Configuration for idempotency behavior.
215
216
Parameters:
217
- event_key_jmespath: JMESPath to extract idempotency key from event
218
- payload_validation_jmespath: JMESPath to extract payload for validation
219
- raise_on_no_idempotency_key: Whether to raise error if no key found
220
- expires_after_seconds: TTL for idempotency records
221
- use_local_cache: Whether to use in-memory cache
222
- local_cache_max_items: Maximum items in local cache
223
- hash_function: Hash function for key generation (md5, sha1, sha256)
224
- lambda_context: Lambda context for additional key generation
225
"""
226
227
class BasePersistenceLayer:
228
"""Base persistence layer for idempotency records"""
229
230
def __init__(
231
self,
232
table_name: str,
233
key_attr: str = "id",
234
expiry_attr: str = "expiration",
235
status_attr: str = "status",
236
data_attr: str = "data",
237
validation_key_attr: str = "validation",
238
):
239
"""
240
Initialize persistence layer.
241
242
Parameters:
243
- table_name: Storage table/collection name
244
- key_attr: Attribute name for idempotency key
245
- expiry_attr: Attribute name for expiration timestamp
246
- status_attr: Attribute name for processing status
247
- data_attr: Attribute name for stored result
248
- validation_key_attr: Attribute name for payload validation
249
"""
250
251
def save_inprogress(
252
self,
253
idempotency_key: str,
254
remaining_time_in_millis: int = None,
255
) -> None:
256
"""
257
Save in-progress idempotency record.
258
259
Parameters:
260
- idempotency_key: Unique idempotency key
261
- remaining_time_in_millis: Remaining Lambda execution time
262
"""
263
264
def save_success(
265
self,
266
idempotency_key: str,
267
result: Any,
268
remaining_time_in_millis: int = None,
269
) -> None:
270
"""
271
Save successful processing result.
272
273
Parameters:
274
- idempotency_key: Unique idempotency key
275
- result: Function result to store
276
- remaining_time_in_millis: Remaining Lambda execution time
277
"""
278
279
def get_record(self, idempotency_key: str) -> Dict[str, Any] | None:
280
"""
281
Retrieve idempotency record by key.
282
283
Parameters:
284
- idempotency_key: Idempotency key to lookup
285
286
Returns:
287
Stored record or None if not found
288
"""
289
290
def delete_record(self, idempotency_key: str) -> None:
291
"""
292
Delete idempotency record.
293
294
Parameters:
295
- idempotency_key: Key of record to delete
296
"""
297
298
class DynamoDBPersistenceLayer(BasePersistenceLayer):
299
def __init__(
300
self,
301
table_name: str,
302
key_attr: str = "id",
303
expiry_attr: str = "expiration",
304
status_attr: str = "status",
305
data_attr: str = "data",
306
validation_key_attr: str = "validation",
307
boto_config: Dict[str, Any] = None,
308
boto3_session: boto3.Session = None,
309
):
310
"""
311
DynamoDB persistence layer for idempotency.
312
313
Parameters:
314
- table_name: DynamoDB table name
315
- key_attr: Primary key attribute name
316
- expiry_attr: TTL attribute name
317
- status_attr: Status attribute name
318
- data_attr: Data attribute name
319
- validation_key_attr: Validation key attribute name
320
- boto_config: Boto3 client configuration
321
- boto3_session: Boto3 session for authentication
322
"""
323
324
class IdempotentHookFunction:
325
"""Hook function interface for idempotency events"""
326
327
def __call__(
328
self,
329
idempotency_key: str,
330
result: Any = None,
331
exception: Exception = None,
332
) -> None:
333
"""
334
Hook function called during idempotency processing.
335
336
Parameters:
337
- idempotency_key: The idempotency key
338
- result: Function result (if successful)
339
- exception: Exception (if failed)
340
"""
341
```
342
343
## Usage Examples
344
345
### Basic Feature Flags
346
347
```python
348
from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore
349
from aws_lambda_powertools.utilities.typing import LambdaContext
350
import os
351
352
# Initialize feature flags with AppConfig
353
store = AppConfigStore(
354
environment=os.environ["ENVIRONMENT"],
355
application=os.environ["APP_NAME"],
356
name="feature-flags"
357
)
358
359
feature_flags = FeatureFlags(store=store)
360
361
def lambda_handler(event: dict, context: LambdaContext) -> dict:
362
# Simple boolean flag
363
if feature_flags.evaluate(name="enable_new_algorithm", default=False):
364
result = new_algorithm_processing(event)
365
else:
366
result = legacy_algorithm_processing(event)
367
368
# Feature flag with context
369
user_id = event.get("user_id")
370
user_context = {"userId": user_id, "region": event.get("region", "us-east-1")}
371
372
if feature_flags.evaluate(name="enable_premium_features", context=user_context, default=False):
373
result["premium_data"] = get_premium_data(user_id)
374
375
# Complex feature flag with configuration
376
recommendation_config = feature_flags.evaluate(
377
name="recommendation_settings",
378
context=user_context,
379
default={"enabled": False, "algorithm": "basic", "limit": 5}
380
)
381
382
if recommendation_config.get("enabled", False):
383
recommendations = get_recommendations(
384
user_id=user_id,
385
algorithm=recommendation_config.get("algorithm", "basic"),
386
limit=recommendation_config.get("limit", 5)
387
)
388
result["recommendations"] = recommendations
389
390
return {
391
"statusCode": 200,
392
"body": result
393
}
394
395
def new_algorithm_processing(event: dict) -> dict:
396
"""New algorithm implementation"""
397
return {"algorithm": "v2", "result": "processed_with_new_logic"}
398
399
def legacy_algorithm_processing(event: dict) -> dict:
400
"""Legacy algorithm implementation"""
401
return {"algorithm": "v1", "result": "processed_with_legacy_logic"}
402
```
403
404
### Advanced Feature Flags with Rules
405
406
```python
407
from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore
408
from aws_lambda_powertools.utilities.typing import LambdaContext
409
from aws_lambda_powertools import Logger
410
import os
411
412
logger = Logger()
413
414
# AppConfig feature flag configuration example:
415
# {
416
# "feature_flags": {
417
# "enable_beta_features": {
418
# "enabled": true,
419
# "rules": {
420
# "allow_beta_users": {
421
# "when_match": [
422
# {
423
# "action": "ALLOW",
424
# "conditions": [
425
# {
426
# "key": "userType",
427
# "value": "beta",
428
# "condition": "EQUALS"
429
# }
430
# ]
431
# }
432
# ],
433
# "when_no_match": {
434
# "action": "DENY"
435
# }
436
# }
437
# },
438
# "default": false
439
# },
440
# "api_rate_limit": {
441
# "enabled": true,
442
# "rules": {
443
# "premium_rate_limit": {
444
# "when_match": [
445
# {
446
# "action": "ALLOW",
447
# "conditions": [
448
# {
449
# "key": "userTier",
450
# "value": "premium",
451
# "condition": "EQUALS"
452
# }
453
# ]
454
# }
455
# ],
456
# "when_no_match": {
457
# "action": "DENY"
458
# }
459
# }
460
# },
461
# "default": {"requests_per_minute": 100}
462
# }
463
# }
464
# }
465
466
store = AppConfigStore(
467
environment=os.environ["ENVIRONMENT"],
468
application="my-app",
469
name="feature-config"
470
)
471
472
feature_flags = FeatureFlags(store=store, logger=logger)
473
474
def lambda_handler(event: dict, context: LambdaContext) -> dict:
475
# Extract user context from event
476
user_context = {
477
"userId": event.get("user_id"),
478
"userType": event.get("user_type", "standard"),
479
"userTier": event.get("user_tier", "basic"),
480
"region": event.get("region", "us-east-1"),
481
"deviceType": event.get("device_type", "web")
482
}
483
484
logger.append_keys(user_context=user_context)
485
486
# Evaluate feature flags with context
487
results = {}
488
489
# Beta features access
490
beta_enabled = feature_flags.evaluate(
491
name="enable_beta_features",
492
context=user_context,
493
default=False
494
)
495
496
if beta_enabled:
497
logger.info("Beta features enabled for user")
498
results["beta_features"] = get_beta_features(user_context)
499
else:
500
logger.info("Beta features not available for user")
501
502
# Dynamic rate limiting
503
rate_limit_config = feature_flags.evaluate(
504
name="api_rate_limit",
505
context=user_context,
506
default={"requests_per_minute": 60}
507
)
508
509
# Apply rate limiting
510
rate_limit = rate_limit_config.get("requests_per_minute", 60)
511
if not check_rate_limit(user_context["userId"], rate_limit):
512
return {
513
"statusCode": 429,
514
"body": {"error": "Rate limit exceeded"}
515
}
516
517
# Batch evaluate multiple flags
518
flag_results = feature_flags.batch_evaluate(
519
flags=["enable_analytics", "enable_notifications", "enable_cache"],
520
context=user_context,
521
default=False
522
)
523
524
# Process based on enabled features
525
if flag_results.get("enable_analytics", False):
526
track_user_event(event, user_context)
527
528
if flag_results.get("enable_notifications", False):
529
queue_notification(user_context["userId"], "feature_accessed")
530
531
# Use caching if enabled
532
cache_enabled = flag_results.get("enable_cache", False)
533
if cache_enabled:
534
results["cached_data"] = get_cached_data(user_context["userId"])
535
else:
536
results["fresh_data"] = get_fresh_data(user_context["userId"])
537
538
return {
539
"statusCode": 200,
540
"body": results,
541
"rate_limit": rate_limit
542
}
543
544
def get_beta_features(user_context: dict) -> dict:
545
"""Get beta features for user"""
546
return {
547
"new_dashboard": True,
548
"advanced_analytics": True,
549
"ai_recommendations": True
550
}
551
552
def check_rate_limit(user_id: str, limit: int) -> bool:
553
"""Check if user is within rate limit"""
554
# Implementation would check against cache/database
555
return True # Simplified for example
556
557
def track_user_event(event: dict, user_context: dict):
558
"""Track analytics event"""
559
logger.info("Tracking user event", extra={"event_type": "api_access"})
560
561
def queue_notification(user_id: str, notification_type: str):
562
"""Queue notification for user"""
563
logger.info("Queuing notification", extra={
564
"user_id": user_id,
565
"type": notification_type
566
})
567
```
568
569
### Basic Idempotency
570
571
```python
572
from aws_lambda_powertools.utilities.idempotency import (
573
idempotent,
574
DynamoDBPersistenceLayer,
575
IdempotencyConfig
576
)
577
from aws_lambda_powertools.utilities.typing import LambdaContext
578
from aws_lambda_powertools import Logger
579
import json
580
581
logger = Logger()
582
583
# Configure DynamoDB persistence layer
584
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
585
586
# Configure idempotency behavior
587
config = IdempotencyConfig(
588
event_key_jmespath="requestId", # Extract requestId from event
589
expires_after_seconds=3600, # 1 hour TTL
590
use_local_cache=True, # Enable local caching
591
)
592
593
@idempotent(persistence_store=persistence_layer, config=config)
594
def lambda_handler(event: dict, context: LambdaContext) -> dict:
595
"""
596
Idempotent Lambda handler - duplicate requests return cached result
597
"""
598
599
request_id = event.get("requestId")
600
logger.info(f"Processing request {request_id}")
601
602
# This logic will only execute once per unique requestId
603
user_id = event.get("userId")
604
order_data = event.get("orderData", {})
605
606
# Simulate expensive operation
607
order_id = process_order(user_id, order_data)
608
609
# Send confirmation (will only happen once)
610
send_order_confirmation(user_id, order_id)
611
612
result = {
613
"orderId": order_id,
614
"status": "processed",
615
"timestamp": context.aws_request_id
616
}
617
618
logger.info(f"Order processed: {order_id}")
619
620
return {
621
"statusCode": 200,
622
"body": json.dumps(result)
623
}
624
625
def process_order(user_id: str, order_data: dict) -> str:
626
"""Process order (expensive operation)"""
627
import uuid
628
import time
629
630
# Simulate processing time
631
time.sleep(1)
632
633
order_id = str(uuid.uuid4())
634
logger.info(f"Created order {order_id} for user {user_id}")
635
636
return order_id
637
638
def send_order_confirmation(user_id: str, order_id: str):
639
"""Send order confirmation (side effect)"""
640
logger.info(f"Sending confirmation for order {order_id} to user {user_id}")
641
# Email/SMS sending logic here
642
```
643
644
### Advanced Idempotency with Custom Keys
645
646
```python
647
from aws_lambda_powertools.utilities.idempotency import (
648
idempotent,
649
idempotent_function,
650
DynamoDBPersistenceLayer,
651
IdempotencyConfig
652
)
653
from aws_lambda_powertools.utilities.typing import LambdaContext
654
from aws_lambda_powertools import Logger
655
import hashlib
656
import json
657
658
logger = Logger()
659
660
# Persistence layer with custom configuration
661
persistence_layer = DynamoDBPersistenceLayer(
662
table_name="IdempotencyTable",
663
key_attr="idempotency_key",
664
expiry_attr="expires_at",
665
status_attr="status",
666
data_attr="result_data"
667
)
668
669
# Complex idempotency configuration
670
config = IdempotencyConfig(
671
event_key_jmespath="headers.`Idempotency-Key` || body.transactionId",
672
payload_validation_jmespath="body", # Validate entire body for changes
673
expires_after_seconds=86400, # 24 hours
674
use_local_cache=True,
675
hash_function="sha256",
676
raise_on_no_idempotency_key=True
677
)
678
679
@idempotent(persistence_store=persistence_layer, config=config)
680
def lambda_handler(event: dict, context: LambdaContext) -> dict:
681
"""Handler with custom idempotency key extraction"""
682
683
# Process payment request
684
payment_data = event.get("body", {})
685
headers = event.get("headers", {})
686
687
idempotency_key = headers.get("Idempotency-Key") or payment_data.get("transactionId")
688
logger.append_keys(idempotency_key=idempotency_key)
689
690
logger.info("Processing payment request")
691
692
# This will only execute once per unique key
693
result = process_payment_request(payment_data)
694
695
return {
696
"statusCode": 200,
697
"body": json.dumps(result)
698
}
699
700
# Separate persistence layer for user operations
701
user_persistence = DynamoDBPersistenceLayer(table_name="UserIdempotencyTable")
702
703
user_config = IdempotencyConfig(
704
expires_after_seconds=3600,
705
use_local_cache=True
706
)
707
708
@idempotent_function(
709
data_keyword_argument="user_data",
710
persistence_store=user_persistence,
711
config=user_config
712
)
713
def create_user(user_data: dict) -> dict:
714
"""Idempotent user creation function"""
715
716
logger.info("Creating new user", extra={"email": user_data.get("email")})
717
718
# Check if user already exists
719
if user_exists(user_data["email"]):
720
raise ValueError("User already exists")
721
722
# Create user (expensive operation)
723
user_id = generate_user_id()
724
save_user_to_database(user_id, user_data)
725
send_welcome_email(user_data["email"])
726
727
return {
728
"user_id": user_id,
729
"email": user_data["email"],
730
"created_at": "2024-01-01T00:00:00Z"
731
}
732
733
def process_payment_request(payment_data: dict) -> dict:
734
"""Process payment with external API"""
735
736
amount = payment_data["amount"]
737
currency = payment_data["currency"]
738
payment_method = payment_data["payment_method"]
739
740
# Call external payment processor
741
transaction_id = call_payment_processor(amount, currency, payment_method)
742
743
# Update internal records
744
update_payment_records(transaction_id, payment_data)
745
746
return {
747
"transaction_id": transaction_id,
748
"amount": amount,
749
"currency": currency,
750
"status": "completed"
751
}
752
753
# Bulk operations with per-item idempotency
754
@idempotent_function(
755
data_keyword_argument="item_data",
756
persistence_store=persistence_layer,
757
config=IdempotencyConfig(expires_after_seconds=1800)
758
)
759
def process_bulk_item(item_data: dict) -> dict:
760
"""Process individual item in bulk operation"""
761
762
item_id = item_data["item_id"]
763
logger.info(f"Processing item {item_id}")
764
765
# Expensive per-item processing
766
result = expensive_item_processing(item_data)
767
768
return {
769
"item_id": item_id,
770
"processed": True,
771
"result": result
772
}
773
774
def bulk_handler(event: dict, context: LambdaContext) -> dict:
775
"""Handle bulk operations with per-item idempotency"""
776
777
items = event.get("items", [])
778
results = []
779
780
for item in items:
781
try:
782
# Each item is processed idempotently
783
result = process_bulk_item(item_data=item)
784
results.append(result)
785
786
except Exception as e:
787
logger.exception(f"Failed to process item {item.get('item_id')}")
788
results.append({
789
"item_id": item.get("item_id"),
790
"error": str(e)
791
})
792
793
return {
794
"statusCode": 200,
795
"body": json.dumps({
796
"processed_count": len([r for r in results if not r.get("error")]),
797
"failed_count": len([r for r in results if r.get("error")]),
798
"results": results
799
})
800
}
801
```
802
803
### Error Handling and Monitoring
804
805
```python
806
from aws_lambda_powertools.utilities.feature_flags import (
807
FeatureFlags,
808
AppConfigStore,
809
ConfigurationStoreError
810
)
811
from aws_lambda_powertools.utilities.idempotency import (
812
idempotent,
813
DynamoDBPersistenceLayer,
814
IdempotencyConfig,
815
IdempotencyAlreadyInProgressError,
816
IdempotencyInconsistentStateError,
817
IdempotencyKeyError
818
)
819
from aws_lambda_powertools.utilities.typing import LambdaContext
820
from aws_lambda_powertools import Logger, Metrics
821
from aws_lambda_powertools.metrics import MetricUnit
822
823
logger = Logger()
824
metrics = Metrics()
825
826
# Feature flags with error handling
827
try:
828
store = AppConfigStore(
829
environment="production",
830
application="my-app",
831
name="feature-flags",
832
max_age=30 # Refresh every 30 seconds
833
)
834
feature_flags = FeatureFlags(store=store, logger=logger)
835
except ConfigurationStoreError as e:
836
logger.error("Failed to initialize feature flags", extra={"error": str(e)})
837
# Fallback to default configuration
838
feature_flags = None
839
840
# Idempotency with error handling
841
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
842
843
config = IdempotencyConfig(
844
event_key_jmespath="requestId",
845
expires_after_seconds=3600,
846
use_local_cache=True
847
)
848
849
@metrics.log_metrics(capture_cold_start_metric=True)
850
@idempotent(persistence_store=persistence_layer, config=config)
851
def lambda_handler(event: dict, context: LambdaContext) -> dict:
852
"""Handler with comprehensive error handling"""
853
854
request_id = event.get("requestId")
855
logger.append_keys(request_id=request_id)
856
857
try:
858
# Feature flag evaluation with fallbacks
859
features = get_enabled_features(event)
860
861
# Record feature usage metrics
862
for feature_name, enabled in features.items():
863
metrics.add_metric(
864
name=f"FeatureFlag_{feature_name}",
865
unit=MetricUnit.Count,
866
value=1 if enabled else 0
867
)
868
869
# Process request based on enabled features
870
result = process_request(event, features)
871
872
# Record success metrics
873
metrics.add_metric(name="RequestProcessed", unit=MetricUnit.Count, value=1)
874
875
return {
876
"statusCode": 200,
877
"body": result
878
}
879
880
except IdempotencyAlreadyInProgressError:
881
# Request is already being processed by another instance
882
logger.warning("Request already in progress")
883
metrics.add_metric(name="IdempotencyInProgress", unit=MetricUnit.Count, value=1)
884
885
return {
886
"statusCode": 409,
887
"body": {"error": "Request already in progress"}
888
}
889
890
except IdempotencyKeyError as e:
891
# Missing or invalid idempotency key
892
logger.error("Invalid idempotency key", extra={"error": str(e)})
893
metrics.add_metric(name="IdempotencyKeyError", unit=MetricUnit.Count, value=1)
894
895
return {
896
"statusCode": 400,
897
"body": {"error": "Invalid or missing idempotency key"}
898
}
899
900
except Exception as e:
901
# Unexpected error
902
logger.exception("Request processing failed")
903
metrics.add_metric(name="RequestFailed", unit=MetricUnit.Count, value=1)
904
905
return {
906
"statusCode": 500,
907
"body": {"error": "Internal server error"}
908
}
909
910
def get_enabled_features(event: dict) -> dict:
911
"""Get enabled features with fallback logic"""
912
913
features = {
914
"enhanced_processing": False,
915
"premium_features": False,
916
"beta_algorithms": False
917
}
918
919
if feature_flags is None:
920
logger.warning("Feature flags unavailable, using defaults")
921
return features
922
923
try:
924
user_context = {
925
"userId": event.get("userId"),
926
"userTier": event.get("userTier", "basic"),
927
"region": event.get("region", "us-east-1")
928
}
929
930
# Evaluate each feature flag with error handling
931
for feature_name in features:
932
try:
933
features[feature_name] = feature_flags.evaluate(
934
name=feature_name,
935
context=user_context,
936
default=features[feature_name]
937
)
938
except Exception as e:
939
logger.warning(
940
f"Failed to evaluate feature flag {feature_name}",
941
extra={"error": str(e)}
942
)
943
# Keep default value
944
945
return features
946
947
except ConfigurationStoreError as e:
948
logger.warning("Feature flag evaluation failed, using defaults", extra={"error": str(e)})
949
metrics.add_metric(name="FeatureFlagError", unit=MetricUnit.Count, value=1)
950
return features
951
952
def process_request(event: dict, features: dict) -> dict:
953
"""Process request based on enabled features"""
954
955
result = {"processed_features": []}
956
957
if features.get("enhanced_processing", False):
958
logger.info("Using enhanced processing")
959
result["enhanced_data"] = enhanced_processing(event)
960
result["processed_features"].append("enhanced_processing")
961
962
if features.get("premium_features", False):
963
logger.info("Including premium features")
964
result["premium_data"] = get_premium_data(event)
965
result["processed_features"].append("premium_features")
966
967
if features.get("beta_algorithms", False):
968
logger.info("Using beta algorithms")
969
result["beta_results"] = beta_algorithm_processing(event)
970
result["processed_features"].append("beta_algorithms")
971
972
# Base processing
973
result["base_data"] = base_processing(event)
974
975
return result
976
977
def enhanced_processing(event: dict) -> dict:
978
"""Enhanced processing implementation"""
979
return {"type": "enhanced", "quality": "high"}
980
981
def get_premium_data(event: dict) -> dict:
982
"""Premium feature data"""
983
return {"premium_insights": True, "advanced_analytics": True}
984
985
def beta_algorithm_processing(event: dict) -> dict:
986
"""Beta algorithm processing"""
987
return {"algorithm": "beta_v2", "accuracy": "experimental"}
988
989
def base_processing(event: dict) -> dict:
990
"""Base processing implementation"""
991
return {"type": "standard", "quality": "normal"}
992
```
993
994
## Types
995
996
```python { .api }
997
from typing import Dict, Any, List, Union, Optional, Callable
998
from aws_lambda_powertools.utilities.typing import LambdaContext
999
1000
# Feature flag types
1001
FeatureFlagValue = Union[bool, str, int, float, Dict[str, Any], List[Any]]
1002
FeatureFlagContext = Dict[str, Any]
1003
FeatureFlagEvaluation = Dict[str, FeatureFlagValue]
1004
1005
# Rule action constants
1006
RuleAction = Literal["ALLOW", "DENY"]
1007
1008
# Store provider types
1009
class StoreProvider:
1010
"""Base interface for feature flag store providers"""
1011
pass
1012
1013
# Configuration store error
1014
class ConfigurationStoreError(Exception):
1015
"""Raised when configuration store operations fail"""
1016
pass
1017
1018
# Schema validation error
1019
class SchemaValidationError(Exception):
1020
"""Raised when feature flag schema validation fails"""
1021
pass
1022
1023
# Idempotency types
1024
IdempotencyKey = str
1025
IdempotencyRecord = Dict[str, Any]
1026
1027
# Idempotency exceptions
1028
class IdempotencyError(Exception):
1029
"""Base idempotency error"""
1030
pass
1031
1032
class IdempotencyKeyError(IdempotencyError):
1033
"""Raised when idempotency key is missing or invalid"""
1034
pass
1035
1036
class IdempotencyAlreadyInProgressError(IdempotencyError):
1037
"""Raised when request is already being processed"""
1038
pass
1039
1040
class IdempotencyInconsistentStateError(IdempotencyError):
1041
"""Raised when idempotency state is inconsistent"""
1042
pass
1043
1044
class IdempotencyValidationError(IdempotencyError):
1045
"""Raised when payload validation fails"""
1046
pass
1047
1048
# Persistence layer types
1049
PersistenceRecord = Dict[str, Any]
1050
1051
# Hook function type
1052
IdempotencyHook = Callable[[str, Any, Optional[Exception]], None]
1053
1054
# Hash function type
1055
HashFunction = Literal["md5", "sha1", "sha256", "sha512"]
1056
1057
# JMESPath expression type
1058
JMESPathExpression = str
1059
1060
# Boto3 configuration types
1061
Boto3Config = Dict[str, Any]
1062
Boto3Session = Any # boto3.Session
1063
```