0
# Policy Management
1
2
Policy-based configuration and governance system with JSON-defined rules, configurable targets, and extensible rule evaluation framework. The policy management system supports complex business logic and compliance requirements with flexible alert configurations and comprehensive policy lifecycle management.
3
4
## Capabilities
5
6
### Abstract Policy Manager
7
8
Core policy management framework providing JSON-based configuration loading, policy validation, and rule configuration with extensible architecture for custom policy implementations.
9
10
```python { .api }
11
class AbstractPolicyManager(ABC):
12
"""
13
Abstract class containing logic for reading and parsing policies and rules from JSON configuration.
14
15
Class Attributes:
16
- __logger - LogManager instance for AbstractPolicyManager
17
- __policyLocation = "POLICY_LOCATION" - Environment variable name for policy location
18
19
Properties:
20
- policies: Dict[str, Policy] - Returns policies dictionary
21
"""
22
23
def __init__(self) -> None:
24
"""Initialize policy manager and load configurations"""
25
...
26
27
def getPoliciesLocation(self) -> str:
28
"""Get policies location from config or environment"""
29
...
30
31
def loadPolicyConfigurations(self, policiesLocation: str) -> None:
32
"""Load policy configurations from JSON files"""
33
...
34
35
def loadJsonFilesInDirectory(self, policyConfigurationsLocation: str) -> None:
36
"""Load all JSON files in directory"""
37
...
38
39
def loadJsonFile(self, filePath: str, fileName: str) -> None:
40
"""Load individual JSON file"""
41
...
42
43
def validateAndAddPolicies(self, policies: List[PolicyInput]) -> None:
44
"""Validate and add list of policies"""
45
...
46
47
def validateAndAddPolicy(self, policyInput: PolicyInput) -> None:
48
"""Validate and add single policy"""
49
...
50
51
def validateAndConfigureRule(self, ruleInput: PolicyRuleInput, targets: List[Target]) -> ConfiguredRule:
52
"""Validate and configure policy rule"""
53
...
54
55
def getPolicy(self, policyIdentifier: str) -> Policy:
56
"""Get policy by identifier"""
57
...
58
59
def getDeserializationClass(self) -> any:
60
"""Get class for JSON deserialization (can be overridden)"""
61
...
62
63
def createPolicy(self, policyIdentifier: str) -> Policy:
64
"""Create policy instance (can be overridden)"""
65
...
66
67
def setAdditionalConfigurations(self, policy: Policy, input: PolicyInput) -> None:
68
"""Set additional configurations (can be overridden)"""
69
...
70
```
71
72
### Default Policy Manager
73
74
Singleton implementation of the policy management framework providing centralized policy access and configuration management across application components.
75
76
```python { .api }
77
class DefaultPolicyManager(AbstractPolicyManager):
78
"""
79
Default singleton policy manager implementation.
80
81
Class Attributes:
82
- __instance = None - Singleton instance
83
"""
84
85
def __init__(self) -> None:
86
"""Initialize singleton instance"""
87
...
88
89
@staticmethod
90
def getInstance() -> DefaultPolicyManager:
91
"""Get singleton instance"""
92
...
93
```
94
95
### Policy Configuration
96
97
Configuration management for policy system settings including policy location and default values with property-based configuration support.
98
99
```python { .api }
100
class PolicyConfiguration:
101
"""
102
Used to configure policy location and defaults.
103
"""
104
105
def __init__(self) -> None:
106
"""Initialize with policy-configuration.properties"""
107
...
108
109
def policiesLocation(self) -> str:
110
"""Returns policies location from configuration"""
111
...
112
```
113
114
### Policy Data Models
115
116
Comprehensive data models for policy definition including alert options, targets, rules, and extensible configuration support with deprecated field handling for backward compatibility.
117
118
```python { .api }
119
class AlertOptions(Enum):
120
"""
121
Enum to determine when alerts will be sent for policies.
122
123
Values:
124
- ALWAYS = "ALWAYS"
125
- ON_DETECTION = "ON_DETECTION"
126
- NEVER = "NEVER"
127
"""
128
ALWAYS = "ALWAYS"
129
ON_DETECTION = "ON_DETECTION"
130
NEVER = "NEVER"
131
132
class Target(BaseModel):
133
"""
134
Contains target information for the policy.
135
136
Attributes:
137
- retrieve_url: Optional[str] = None - Target retrieval URL
138
- type: Optional[str] = None - Target type
139
"""
140
retrieve_url: Optional[str] = None
141
type: Optional[str] = None
142
143
class ConfiguredTarget(Target):
144
"""
145
Contains target information with configurations needed by the rule.
146
147
Attributes:
148
- target_configurations: Dict[str, Any] - Target configurations dictionary
149
- Inherits: retrieve_url and type from Target
150
"""
151
target_configurations: Dict[str, Any]
152
153
class ConfiguredRule(BaseModel):
154
"""
155
Represents a rule read by the policy manager with class and configuration information.
156
157
Class Attributes:
158
- __logger - LogManager instance for ConfiguredRule
159
- __deprecated_set_methods - Dictionary for deprecated method mapping
160
161
Attributes:
162
- className: str - Rule class name
163
- configurations: Optional[Dict[str, Any]] = None - Rule configurations
164
- configuredTargets: Optional[List[ConfiguredTarget]] = [] - List of configured targets
165
166
Properties:
167
- targetConfigurations: ConfiguredTarget - Deprecated property (use configuredTargets)
168
"""
169
className: str
170
configurations: Optional[Dict[str, Any]] = None
171
configuredTargets: Optional[List[ConfiguredTarget]] = []
172
173
def set_deprecated_targetConfigurations(self, new_value: ConfiguredTarget):
174
"""Deprecated setter method"""
175
...
176
177
def __setattr__(self, key, val):
178
"""Custom attribute setter for deprecated attributes"""
179
...
180
181
class Policy(BaseModel):
182
"""
183
Maps a rule or set of rules with identifier for policy execution.
184
185
Class Attributes:
186
- __logger - LogManager instance for Policy
187
- __deprecated_set_methods - Dictionary for deprecated method mapping
188
189
Attributes:
190
- alertOptions: AlertOptions = AlertOptions.ON_DETECTION - Alert options (default: ON_DETECTION)
191
- identifier: str - Policy identifier
192
- description: Optional[str] = None - Policy description
193
- targets: Optional[List[Target]] = [] - List of targets
194
- rules: List[ConfiguredRule] = [] - List of configured rules
195
196
Properties:
197
- target: Target - Deprecated property (use targets)
198
"""
199
alertOptions: AlertOptions = AlertOptions.ON_DETECTION
200
identifier: str
201
description: Optional[str] = None
202
targets: Optional[List[Target]] = []
203
rules: List[ConfiguredRule] = []
204
205
def set_deprecated_target(self, new_value: Target):
206
"""Deprecated setter method"""
207
...
208
209
def __setattr__(self, key, val):
210
"""Custom attribute setter for deprecated attributes"""
211
...
212
```
213
214
### Policy Input Models
215
216
JSON deserialization models for loading policy configurations from files with support for both current and deprecated field structures for backward compatibility.
217
218
```python { .api }
219
class PolicyRuleInput(BaseModel):
220
"""
221
Represents policy rule data read from JSON file.
222
223
Attributes:
224
- className: str - Class name for the rule (required)
225
- configurations: Optional[Dict[str, Any]] = None - Rule configuration
226
- targetConfigurations: Optional[Dict[str, Any]] = None - Target configurations
227
"""
228
className: str
229
configurations: Optional[Dict[str, Any]] = None
230
targetConfigurations: Optional[Dict[str, Any]] = None
231
232
class PolicyInput(BaseModel):
233
"""
234
Represents policy information read from JSON file.
235
236
Class Attributes:
237
- __logger - LogManager instance for PolicyInput
238
239
Attributes:
240
- identifier: str - Policy identifier (required)
241
- description: Optional[str] = None - Policy description
242
- targets: Optional[List[Target]] = None - List of targets
243
- shouldSendAlert: Optional[AlertOptions] = None - Alert configuration
244
- rules: List[PolicyRuleInput] = [] - List of policy rules
245
- target: Optional[Target] = None - Deprecated single target
246
"""
247
identifier: str
248
description: Optional[str] = None
249
targets: Optional[List[Target]] = None
250
shouldSendAlert: Optional[AlertOptions] = None
251
rules: List[PolicyRuleInput] = []
252
target: Optional[Target] = None
253
254
def getAnyTargets(self) -> List[Target]:
255
"""Returns targets list, checking both new and deprecated attributes"""
256
...
257
```
258
259
### Policy Invocation Results
260
261
Result tracking for policy execution with comprehensive metadata including policy identification, description, and execution timestamps for audit and monitoring purposes.
262
263
```python { .api }
264
class PolicyInvocationResult(BaseModel):
265
"""
266
Represents the results of a policy invocation.
267
268
Attributes:
269
- policyName: str - Name of the policy
270
- policyDescription: Optional[str] = None - Policy description
271
- timestamp: str - Invocation timestamp
272
"""
273
policyName: str
274
policyDescription: Optional[str] = None
275
timestamp: str
276
```
277
278
## Usage Examples
279
280
### Basic Policy Management Setup
281
282
```python
283
from policy_manager import DefaultPolicyManager
284
from policy_manager.policy.policy import Policy, AlertOptions
285
import json
286
import os
287
288
# Initialize policy manager (singleton)
289
policy_manager = DefaultPolicyManager.getInstance()
290
291
# Get available policies
292
available_policies = policy_manager.policies
293
print(f"Loaded {len(available_policies)} policies:")
294
for policy_id, policy in available_policies.items():
295
print(f" - {policy_id}: {policy.description}")
296
297
# Get specific policy
298
try:
299
security_policy = policy_manager.getPolicy("data_security_policy")
300
print(f"Security policy loaded: {security_policy.identifier}")
301
print(f"Rules count: {len(security_policy.rules)}")
302
print(f"Alert setting: {security_policy.alertOptions}")
303
except Exception as e:
304
print(f"Policy not found: {e}")
305
```
306
307
### JSON Policy Configuration
308
309
```python
310
from policy_manager import DefaultPolicyManager
311
from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions
312
from policy_manager.policy.json.policy_input import PolicyInput, PolicyRuleInput
313
import json
314
import tempfile
315
import os
316
317
def create_sample_policy_configuration():
318
"""Create sample JSON policy configuration"""
319
320
# Define policy configuration
321
policy_config = {
322
"identifier": "data_quality_policy",
323
"description": "Ensures data quality standards across all datasets",
324
"shouldSendAlert": "ON_DETECTION",
325
"targets": [
326
{
327
"retrieve_url": "s3://data-lake/raw-data/",
328
"type": "data_source"
329
},
330
{
331
"retrieve_url": "s3://data-lake/processed-data/",
332
"type": "processed_data"
333
}
334
],
335
"rules": [
336
{
337
"className": "com.example.rules.CompletenessRule",
338
"configurations": {
339
"minimum_completeness": 0.95,
340
"required_fields": ["id", "timestamp", "value"]
341
},
342
"targetConfigurations": {
343
"batch_size": 10000,
344
"sampling_rate": 0.1
345
}
346
},
347
{
348
"className": "com.example.rules.AccuracyRule",
349
"configurations": {
350
"accuracy_threshold": 0.98,
351
"validation_method": "statistical"
352
}
353
}
354
]
355
}
356
357
# Create temporary policy file
358
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
359
json.dump(policy_config, f, indent=2)
360
return f.name
361
362
def load_custom_policies():
363
"""Load custom policies from JSON configuration"""
364
365
# Create sample configuration
366
policy_file = create_sample_policy_configuration()
367
368
try:
369
# Get policy manager
370
manager = DefaultPolicyManager.getInstance()
371
372
# Load the custom policy file
373
manager.loadJsonFile(os.path.dirname(policy_file), os.path.basename(policy_file))
374
375
# Retrieve the loaded policy
376
policy = manager.getPolicy("data_quality_policy")
377
378
print(f"Loaded policy: {policy.identifier}")
379
print(f"Description: {policy.description}")
380
print(f"Alert options: {policy.alertOptions}")
381
print(f"Number of targets: {len(policy.targets)}")
382
print(f"Number of rules: {len(policy.rules)}")
383
384
# Print rule details
385
for i, rule in enumerate(policy.rules):
386
print(f"\nRule {i+1}:")
387
print(f" Class: {rule.className}")
388
print(f" Configurations: {rule.configurations}")
389
if rule.configuredTargets:
390
print(f" Target configurations: {len(rule.configuredTargets)}")
391
392
return policy
393
394
finally:
395
# Clean up temporary file
396
if os.path.exists(policy_file):
397
os.unlink(policy_file)
398
399
# Usage example
400
custom_policy = load_custom_policies()
401
```
402
403
### Policy Execution Framework
404
405
```python
406
from policy_manager import DefaultPolicyManager
407
from policy_manager.policy.policy import Policy, ConfiguredRule
408
from policy_manager.policy.result.policy_invocation_result import PolicyInvocationResult
409
from datetime import datetime
410
from typing import Dict, List, Any
411
import logging
412
413
class PolicyExecutionEngine:
414
"""Engine for executing policies and tracking results"""
415
416
def __init__(self):
417
self.policy_manager = DefaultPolicyManager.getInstance()
418
self.execution_results = []
419
self.logger = logging.getLogger(__name__)
420
421
def execute_policy(self, policy_identifier: str, target_data: Any, context: Dict[str, Any] = None) -> PolicyInvocationResult:
422
"""Execute a specific policy against target data"""
423
424
try:
425
# Get policy configuration
426
policy = self.policy_manager.getPolicy(policy_identifier)
427
428
# Create invocation result
429
result = PolicyInvocationResult(
430
policyName=policy.identifier,
431
policyDescription=policy.description,
432
timestamp=datetime.now().isoformat()
433
)
434
435
print(f"Executing policy: {policy.identifier}")
436
437
# Execute each rule in the policy
438
rule_results = []
439
for rule in policy.rules:
440
rule_result = self._execute_rule(rule, target_data, policy.targets, context)
441
rule_results.append(rule_result)
442
443
print(f" Rule {rule.className}: {'PASSED' if rule_result['passed'] else 'FAILED'}")
444
if not rule_result['passed']:
445
print(f" Reason: {rule_result.get('reason', 'Unknown')}")
446
447
# Determine overall policy result
448
policy_passed = all(result['passed'] for result in rule_results)
449
450
# Handle alerting based on policy configuration
451
if not policy_passed and policy.alertOptions != AlertOptions.NEVER:
452
self._send_alert(policy, rule_results, context)
453
elif policy.alertOptions == AlertOptions.ALWAYS:
454
self._send_alert(policy, rule_results, context)
455
456
# Store result
457
result.rule_results = rule_results # Add rule results to invocation result
458
result.overall_status = "PASSED" if policy_passed else "FAILED"
459
self.execution_results.append(result)
460
461
return result
462
463
except Exception as e:
464
self.logger.error(f"Policy execution failed: {e}")
465
raise
466
467
def _execute_rule(self, rule: ConfiguredRule, target_data: Any, targets: List, context: Dict[str, Any]) -> Dict[str, Any]:
468
"""Execute individual rule (this would integrate with actual rule implementations)"""
469
470
# This is a simplified rule execution simulation
471
# In practice, this would dynamically load and execute the rule class
472
473
rule_result = {
474
'rule_class': rule.className,
475
'passed': True,
476
'details': {},
477
'execution_time': datetime.now().isoformat()
478
}
479
480
# Simulate rule-specific logic based on class name
481
if "CompletenessRule" in rule.className:
482
rule_result.update(self._execute_completeness_rule(rule, target_data))
483
elif "AccuracyRule" in rule.className:
484
rule_result.update(self._execute_accuracy_rule(rule, target_data))
485
elif "DataFreshnessRule" in rule.className:
486
rule_result.update(self._execute_freshness_rule(rule, target_data))
487
else:
488
# Generic rule execution
489
rule_result['details']['message'] = f"Executed {rule.className} successfully"
490
491
return rule_result
492
493
def _execute_completeness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
494
"""Simulate completeness rule execution"""
495
config = rule.configurations or {}
496
min_completeness = config.get('minimum_completeness', 0.95)
497
498
# Simulate completeness check (in practice, this would analyze actual data)
499
simulated_completeness = 0.97 # Assume 97% completeness
500
501
passed = simulated_completeness >= min_completeness
502
return {
503
'passed': passed,
504
'details': {
505
'completeness_score': simulated_completeness,
506
'threshold': min_completeness,
507
'message': f"Data completeness: {simulated_completeness:.2%}"
508
},
509
'reason': None if passed else f"Completeness {simulated_completeness:.2%} below threshold {min_completeness:.2%}"
510
}
511
512
def _execute_accuracy_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
513
"""Simulate accuracy rule execution"""
514
config = rule.configurations or {}
515
accuracy_threshold = config.get('accuracy_threshold', 0.98)
516
517
# Simulate accuracy check
518
simulated_accuracy = 0.96 # Assume 96% accuracy
519
520
passed = simulated_accuracy >= accuracy_threshold
521
return {
522
'passed': passed,
523
'details': {
524
'accuracy_score': simulated_accuracy,
525
'threshold': accuracy_threshold,
526
'validation_method': config.get('validation_method', 'unknown')
527
},
528
'reason': None if passed else f"Accuracy {simulated_accuracy:.2%} below threshold {accuracy_threshold:.2%}"
529
}
530
531
def _execute_freshness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
532
"""Simulate data freshness rule execution"""
533
config = rule.configurations or {}
534
max_age_hours = config.get('max_age_hours', 24)
535
536
# Simulate freshness check
537
simulated_age_hours = 30 # Assume data is 30 hours old
538
539
passed = simulated_age_hours <= max_age_hours
540
return {
541
'passed': passed,
542
'details': {
543
'data_age_hours': simulated_age_hours,
544
'max_allowed_hours': max_age_hours
545
},
546
'reason': None if passed else f"Data age {simulated_age_hours}h exceeds limit {max_age_hours}h"
547
}
548
549
def _send_alert(self, policy: Policy, rule_results: List[Dict], context: Dict[str, Any]):
550
"""Send policy violation alert"""
551
failed_rules = [r for r in rule_results if not r['passed']]
552
553
alert_message = f"Policy Violation: {policy.identifier}\n"
554
alert_message += f"Description: {policy.description}\n"
555
alert_message += f"Failed Rules: {len(failed_rules)}/{len(rule_results)}\n\n"
556
557
for failed_rule in failed_rules:
558
alert_message += f"- {failed_rule['rule_class']}: {failed_rule['reason']}\n"
559
560
print(f"π¨ POLICY ALERT π¨")
561
print(alert_message)
562
563
# In practice, this would integrate with alerting systems
564
# (email, Slack, PagerDuty, etc.)
565
566
def get_execution_summary(self) -> Dict[str, Any]:
567
"""Get summary of all policy executions"""
568
total_executions = len(self.execution_results)
569
passed_executions = sum(1 for result in self.execution_results if getattr(result, 'overall_status', 'FAILED') == 'PASSED')
570
571
return {
572
'total_executions': total_executions,
573
'passed_executions': passed_executions,
574
'failed_executions': total_executions - passed_executions,
575
'success_rate': (passed_executions / total_executions * 100) if total_executions > 0 else 0
576
}
577
578
# Usage example
579
execution_engine = PolicyExecutionEngine()
580
581
# Execute policies against sample data
582
sample_data = {
583
'dataset_id': 'customer_transactions_2024',
584
'record_count': 1500000,
585
'last_updated': '2024-09-05T10:30:00Z'
586
}
587
588
# Execute different policies
589
try:
590
# Data quality policy
591
quality_result = execution_engine.execute_policy(
592
policy_identifier='data_quality_policy',
593
target_data=sample_data,
594
context={'environment': 'production', 'pipeline_id': 'etl_001'}
595
)
596
597
print(f"Quality policy result: {getattr(quality_result, 'overall_status', 'Unknown')}")
598
599
# Get execution summary
600
summary = execution_engine.get_execution_summary()
601
print(f"\nExecution Summary:")
602
print(f" Total executions: {summary['total_executions']}")
603
print(f" Success rate: {summary['success_rate']:.1f}%")
604
605
except Exception as e:
606
print(f"Policy execution failed: {e}")
607
```
608
609
### Dynamic Policy Configuration Management
610
611
```python
612
from policy_manager import DefaultPolicyManager
613
from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions
614
from policy_manager.configuration.policy_configuration import PolicyConfiguration
615
import json
616
import os
617
from typing import Dict, List
618
from datetime import datetime
619
620
class DynamicPolicyManager:
621
"""Advanced policy management with dynamic configuration updates"""
622
623
def __init__(self):
624
self.base_manager = DefaultPolicyManager.getInstance()
625
self.config = PolicyConfiguration()
626
self.policy_cache = {}
627
self.change_listeners = []
628
629
def register_change_listener(self, callback):
630
"""Register callback for policy configuration changes"""
631
self.change_listeners.append(callback)
632
633
def notify_change_listeners(self, policy_id: str, change_type: str):
634
"""Notify registered listeners of policy changes"""
635
for callback in self.change_listeners:
636
try:
637
callback(policy_id, change_type)
638
except Exception as e:
639
print(f"Error notifying change listener: {e}")
640
641
def create_policy_runtime(self, policy_config: Dict) -> Policy:
642
"""Create policy dynamically from configuration dictionary"""
643
644
# Create targets
645
targets = []
646
if 'targets' in policy_config:
647
for target_config in policy_config['targets']:
648
target = Target(
649
retrieve_url=target_config.get('retrieve_url'),
650
type=target_config.get('type')
651
)
652
targets.append(target)
653
654
# Create rules
655
rules = []
656
if 'rules' in policy_config:
657
for rule_config in policy_config['rules']:
658
rule = ConfiguredRule(
659
className=rule_config['className'],
660
configurations=rule_config.get('configurations'),
661
configuredTargets=[] # Would be populated based on targetConfigurations
662
)
663
rules.append(rule)
664
665
# Create policy
666
policy = Policy(
667
identifier=policy_config['identifier'],
668
description=policy_config.get('description'),
669
alertOptions=AlertOptions(policy_config.get('shouldSendAlert', 'ON_DETECTION')),
670
targets=targets,
671
rules=rules
672
)
673
674
return policy
675
676
def update_policy_configuration(self, policy_id: str, updates: Dict):
677
"""Update existing policy configuration"""
678
try:
679
# Get existing policy
680
existing_policy = self.base_manager.getPolicy(policy_id)
681
682
# Create updated configuration
683
updated_config = {
684
'identifier': existing_policy.identifier,
685
'description': existing_policy.description,
686
'shouldSendAlert': existing_policy.alertOptions.value,
687
'targets': [
688
{'retrieve_url': t.retrieve_url, 'type': t.type}
689
for t in existing_policy.targets
690
] if existing_policy.targets else [],
691
'rules': [
692
{
693
'className': r.className,
694
'configurations': r.configurations,
695
'targetConfigurations': {} # Simplified
696
}
697
for r in existing_policy.rules
698
]
699
}
700
701
# Apply updates
702
updated_config.update(updates)
703
704
# Create new policy
705
new_policy = self.create_policy_runtime(updated_config)
706
707
# Update in manager (simplified - would need proper integration)
708
self.policy_cache[policy_id] = new_policy
709
710
# Notify listeners
711
self.notify_change_listeners(policy_id, 'UPDATED')
712
713
print(f"Policy {policy_id} updated successfully")
714
return new_policy
715
716
except Exception as e:
717
print(f"Failed to update policy {policy_id}: {e}")
718
raise
719
720
def create_conditional_policy(self, base_policy_id: str, conditions: Dict) -> str:
721
"""Create conditional policy based on existing policy"""
722
723
base_policy = self.base_manager.getPolicy(base_policy_id)
724
725
# Generate conditional policy ID
726
condition_hash = hash(json.dumps(conditions, sort_keys=True))
727
conditional_id = f"{base_policy_id}_conditional_{abs(condition_hash)}"
728
729
# Create conditional rules
730
conditional_rules = []
731
for rule in base_policy.rules:
732
# Add conditions to rule configuration
733
enhanced_config = rule.configurations.copy() if rule.configurations else {}
734
enhanced_config['conditions'] = conditions
735
736
conditional_rule = ConfiguredRule(
737
className=rule.className,
738
configurations=enhanced_config,
739
configuredTargets=rule.configuredTargets
740
)
741
conditional_rules.append(conditional_rule)
742
743
# Create conditional policy
744
conditional_policy = Policy(
745
identifier=conditional_id,
746
description=f"Conditional variant of {base_policy.description}",
747
alertOptions=base_policy.alertOptions,
748
targets=base_policy.targets,
749
rules=conditional_rules
750
)
751
752
# Store in cache
753
self.policy_cache[conditional_id] = conditional_policy
754
755
# Notify listeners
756
self.notify_change_listeners(conditional_id, 'CREATED')
757
758
return conditional_id
759
760
def get_policy_with_fallback(self, policy_id: str) -> Policy:
761
"""Get policy with cache fallback"""
762
763
# Try cache first
764
if policy_id in self.policy_cache:
765
return self.policy_cache[policy_id]
766
767
# Fallback to base manager
768
try:
769
return self.base_manager.getPolicy(policy_id)
770
except Exception:
771
# Create default policy if not found
772
return self._create_default_policy(policy_id)
773
774
def _create_default_policy(self, policy_id: str) -> Policy:
775
"""Create default policy for unknown identifiers"""
776
777
default_policy = Policy(
778
identifier=policy_id,
779
description=f"Default policy for {policy_id}",
780
alertOptions=AlertOptions.NEVER,
781
targets=[],
782
rules=[]
783
)
784
785
self.policy_cache[policy_id] = default_policy
786
return default_policy
787
788
def export_policies_to_json(self, policy_ids: List[str] = None) -> str:
789
"""Export policies to JSON configuration"""
790
791
policies_to_export = policy_ids or list(self.base_manager.policies.keys())
792
793
export_data = []
794
for policy_id in policies_to_export:
795
try:
796
policy = self.get_policy_with_fallback(policy_id)
797
798
policy_data = {
799
'identifier': policy.identifier,
800
'description': policy.description,
801
'shouldSendAlert': policy.alertOptions.value,
802
'targets': [
803
{
804
'retrieve_url': target.retrieve_url,
805
'type': target.type
806
}
807
for target in (policy.targets or [])
808
],
809
'rules': [
810
{
811
'className': rule.className,
812
'configurations': rule.configurations or {},
813
'targetConfigurations': {} # Simplified
814
}
815
for rule in policy.rules
816
]
817
}
818
819
export_data.append(policy_data)
820
821
except Exception as e:
822
print(f"Error exporting policy {policy_id}: {e}")
823
824
return json.dumps(export_data, indent=2)
825
826
# Usage example
827
def policy_management_demo():
828
"""Demonstrate dynamic policy management capabilities"""
829
830
dynamic_manager = DynamicPolicyManager()
831
832
# Register change listener
833
def policy_change_handler(policy_id: str, change_type: str):
834
print(f"π Policy Change: {policy_id} - {change_type}")
835
836
dynamic_manager.register_change_listener(policy_change_handler)
837
838
# Create new policy at runtime
839
new_policy_config = {
840
'identifier': 'dynamic_security_policy',
841
'description': 'Runtime created security policy',
842
'shouldSendAlert': 'ALWAYS',
843
'targets': [
844
{'retrieve_url': 'https://api.security.com/scan', 'type': 'security_endpoint'}
845
],
846
'rules': [
847
{
848
'className': 'com.example.rules.ThreatDetectionRule',
849
'configurations': {
850
'sensitivity': 'high',
851
'scan_types': ['malware', 'intrusion', 'anomaly']
852
}
853
}
854
]
855
}
856
857
new_policy = dynamic_manager.create_policy_runtime(new_policy_config)
858
print(f"Created policy: {new_policy.identifier}")
859
860
# Update existing policy
861
updates = {
862
'description': 'Updated security policy with enhanced monitoring',
863
'rules': [
864
{
865
'className': 'com.example.rules.EnhancedThreatDetectionRule',
866
'configurations': {
867
'sensitivity': 'maximum',
868
'scan_types': ['malware', 'intrusion', 'anomaly', 'behavioral'],
869
'real_time_monitoring': True
870
}
871
}
872
]
873
}
874
875
try:
876
updated_policy = dynamic_manager.update_policy_configuration('dynamic_security_policy', updates)
877
print(f"Updated policy: {updated_policy.identifier}")
878
except Exception as e:
879
print(f"Update failed: {e}")
880
881
# Create conditional policy
882
conditions = {
883
'environment': 'production',
884
'data_classification': 'sensitive',
885
'time_window': '09:00-17:00'
886
}
887
888
conditional_id = dynamic_manager.create_conditional_policy('dynamic_security_policy', conditions)
889
print(f"Created conditional policy: {conditional_id}")
890
891
# Export policies
892
exported_config = dynamic_manager.export_policies_to_json(['dynamic_security_policy', conditional_id])
893
print("Exported policy configuration:")
894
print(exported_config[:200] + "..." if len(exported_config) > 200 else exported_config)
895
896
# Run the demonstration
897
policy_management_demo()
898
```
899
900
## Best Practices
901
902
### Policy Design
903
- Use clear, descriptive policy identifiers and descriptions
904
- Design policies to be composable and reusable
905
- Implement proper validation for policy configurations
906
- Use appropriate alert options based on business requirements
907
908
### Configuration Management
909
- Store policy configurations in version control
910
- Use environment-specific policy configurations
911
- Implement policy configuration validation
912
- Regular policy configuration backups
913
914
### Rule Development
915
- Create modular, testable rule implementations
916
- Use dependency injection for rule dependencies
917
- Implement comprehensive error handling in rules
918
- Document rule configuration parameters clearly
919
920
### Performance and Scalability
921
- Cache frequently accessed policies
922
- Implement efficient policy loading strategies
923
- Monitor policy execution performance
924
- Use asynchronous execution for complex rule evaluations
925
926
### Governance and Compliance
927
- Implement policy change approval workflows
928
- Maintain audit trails for policy executions
929
- Regular policy effectiveness reviews
930
- Compliance reporting and documentation