or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth.mdbom.mdconfig.mdfilestore.mdindex.mdinference.mdmetadata.mdpolicy.md

policy.mddocs/

0

# Policy Management

1

2

Policy-based configuration and governance system with JSON-defined rules, configurable targets, and extensible rule evaluation framework. The policy management system supports complex business logic and compliance requirements with flexible alert configurations and comprehensive policy lifecycle management.

3

4

## Capabilities

5

6

### Abstract Policy Manager

7

8

Core policy management framework providing JSON-based configuration loading, policy validation, and rule configuration with extensible architecture for custom policy implementations.

9

10

```python { .api }

11

class AbstractPolicyManager(ABC):

12

"""

13

Abstract class containing logic for reading and parsing policies and rules from JSON configuration.

14

15

Class Attributes:

16

- __logger - LogManager instance for AbstractPolicyManager

17

- __policyLocation = "POLICY_LOCATION" - Environment variable name for policy location

18

19

Properties:

20

- policies: Dict[str, Policy] - Returns policies dictionary

21

"""

22

23

def __init__(self) -> None:

24

"""Initialize policy manager and load configurations"""

25

...

26

27

def getPoliciesLocation(self) -> str:

28

"""Get policies location from config or environment"""

29

...

30

31

def loadPolicyConfigurations(self, policiesLocation: str) -> None:

32

"""Load policy configurations from JSON files"""

33

...

34

35

def loadJsonFilesInDirectory(self, policyConfigurationsLocation: str) -> None:

36

"""Load all JSON files in directory"""

37

...

38

39

def loadJsonFile(self, filePath: str, fileName: str) -> None:

40

"""Load individual JSON file"""

41

...

42

43

def validateAndAddPolicies(self, policies: List[PolicyInput]) -> None:

44

"""Validate and add list of policies"""

45

...

46

47

def validateAndAddPolicy(self, policyInput: PolicyInput) -> None:

48

"""Validate and add single policy"""

49

...

50

51

def validateAndConfigureRule(self, ruleInput: PolicyRuleInput, targets: List[Target]) -> ConfiguredRule:

52

"""Validate and configure policy rule"""

53

...

54

55

def getPolicy(self, policyIdentifier: str) -> Policy:

56

"""Get policy by identifier"""

57

...

58

59

def getDeserializationClass(self) -> any:

60

"""Get class for JSON deserialization (can be overridden)"""

61

...

62

63

def createPolicy(self, policyIdentifier: str) -> Policy:

64

"""Create policy instance (can be overridden)"""

65

...

66

67

def setAdditionalConfigurations(self, policy: Policy, input: PolicyInput) -> None:

68

"""Set additional configurations (can be overridden)"""

69

...

70

```

71

72

### Default Policy Manager

73

74

Singleton implementation of the policy management framework providing centralized policy access and configuration management across application components.

75

76

```python { .api }

77

class DefaultPolicyManager(AbstractPolicyManager):

78

"""

79

Default singleton policy manager implementation.

80

81

Class Attributes:

82

- __instance = None - Singleton instance

83

"""

84

85

def __init__(self) -> None:

86

"""Initialize singleton instance"""

87

...

88

89

@staticmethod

90

def getInstance() -> DefaultPolicyManager:

91

"""Get singleton instance"""

92

...

93

```

94

95

### Policy Configuration

96

97

Configuration management for policy system settings including policy location and default values with property-based configuration support.

98

99

```python { .api }

100

class PolicyConfiguration:

101

"""

102

Used to configure policy location and defaults.

103

"""

104

105

def __init__(self) -> None:

106

"""Initialize with policy-configuration.properties"""

107

...

108

109

def policiesLocation(self) -> str:

110

"""Returns policies location from configuration"""

111

...

112

```

113

114

### Policy Data Models

115

116

Comprehensive data models for policy definition including alert options, targets, rules, and extensible configuration support with deprecated field handling for backward compatibility.

117

118

```python { .api }

119

class AlertOptions(Enum):

120

"""

121

Enum to determine when alerts will be sent for policies.

122

123

Values:

124

- ALWAYS = "ALWAYS"

125

- ON_DETECTION = "ON_DETECTION"

126

- NEVER = "NEVER"

127

"""

128

ALWAYS = "ALWAYS"

129

ON_DETECTION = "ON_DETECTION"

130

NEVER = "NEVER"

131

132

class Target(BaseModel):

133

"""

134

Contains target information for the policy.

135

136

Attributes:

137

- retrieve_url: Optional[str] = None - Target retrieval URL

138

- type: Optional[str] = None - Target type

139

"""

140

retrieve_url: Optional[str] = None

141

type: Optional[str] = None

142

143

class ConfiguredTarget(Target):

144

"""

145

Contains target information with configurations needed by the rule.

146

147

Attributes:

148

- target_configurations: Dict[str, Any] - Target configurations dictionary

149

- Inherits: retrieve_url and type from Target

150

"""

151

target_configurations: Dict[str, Any]

152

153

class ConfiguredRule(BaseModel):

154

"""

155

Represents a rule read by the policy manager with class and configuration information.

156

157

Class Attributes:

158

- __logger - LogManager instance for ConfiguredRule

159

- __deprecated_set_methods - Dictionary for deprecated method mapping

160

161

Attributes:

162

- className: str - Rule class name

163

- configurations: Optional[Dict[str, Any]] = None - Rule configurations

164

- configuredTargets: Optional[List[ConfiguredTarget]] = [] - List of configured targets

165

166

Properties:

167

- targetConfigurations: ConfiguredTarget - Deprecated property (use configuredTargets)

168

"""

169

className: str

170

configurations: Optional[Dict[str, Any]] = None

171

configuredTargets: Optional[List[ConfiguredTarget]] = []

172

173

def set_deprecated_targetConfigurations(self, new_value: ConfiguredTarget):

174

"""Deprecated setter method"""

175

...

176

177

def __setattr__(self, key, val):

178

"""Custom attribute setter for deprecated attributes"""

179

...

180

181

class Policy(BaseModel):

182

"""

183

Maps a rule or set of rules with identifier for policy execution.

184

185

Class Attributes:

186

- __logger - LogManager instance for Policy

187

- __deprecated_set_methods - Dictionary for deprecated method mapping

188

189

Attributes:

190

- alertOptions: AlertOptions = AlertOptions.ON_DETECTION - Alert options (default: ON_DETECTION)

191

- identifier: str - Policy identifier

192

- description: Optional[str] = None - Policy description

193

- targets: Optional[List[Target]] = [] - List of targets

194

- rules: List[ConfiguredRule] = [] - List of configured rules

195

196

Properties:

197

- target: Target - Deprecated property (use targets)

198

"""

199

alertOptions: AlertOptions = AlertOptions.ON_DETECTION

200

identifier: str

201

description: Optional[str] = None

202

targets: Optional[List[Target]] = []

203

rules: List[ConfiguredRule] = []

204

205

def set_deprecated_target(self, new_value: Target):

206

"""Deprecated setter method"""

207

...

208

209

def __setattr__(self, key, val):

210

"""Custom attribute setter for deprecated attributes"""

211

...

212

```

213

214

### Policy Input Models

215

216

JSON deserialization models for loading policy configurations from files with support for both current and deprecated field structures for backward compatibility.

217

218

```python { .api }

219

class PolicyRuleInput(BaseModel):

220

"""

221

Represents policy rule data read from JSON file.

222

223

Attributes:

224

- className: str - Class name for the rule (required)

225

- configurations: Optional[Dict[str, Any]] = None - Rule configuration

226

- targetConfigurations: Optional[Dict[str, Any]] = None - Target configurations

227

"""

228

className: str

229

configurations: Optional[Dict[str, Any]] = None

230

targetConfigurations: Optional[Dict[str, Any]] = None

231

232

class PolicyInput(BaseModel):

233

"""

234

Represents policy information read from JSON file.

235

236

Class Attributes:

237

- __logger - LogManager instance for PolicyInput

238

239

Attributes:

240

- identifier: str - Policy identifier (required)

241

- description: Optional[str] = None - Policy description

242

- targets: Optional[List[Target]] = None - List of targets

243

- shouldSendAlert: Optional[AlertOptions] = None - Alert configuration

244

- rules: List[PolicyRuleInput] = [] - List of policy rules

245

- target: Optional[Target] = None - Deprecated single target

246

"""

247

identifier: str

248

description: Optional[str] = None

249

targets: Optional[List[Target]] = None

250

shouldSendAlert: Optional[AlertOptions] = None

251

rules: List[PolicyRuleInput] = []

252

target: Optional[Target] = None

253

254

def getAnyTargets(self) -> List[Target]:

255

"""Returns targets list, checking both new and deprecated attributes"""

256

...

257

```

258

259

### Policy Invocation Results

260

261

Result tracking for policy execution with comprehensive metadata including policy identification, description, and execution timestamps for audit and monitoring purposes.

262

263

```python { .api }

264

class PolicyInvocationResult(BaseModel):

265

"""

266

Represents the results of a policy invocation.

267

268

Attributes:

269

- policyName: str - Name of the policy

270

- policyDescription: Optional[str] = None - Policy description

271

- timestamp: str - Invocation timestamp

272

"""

273

policyName: str

274

policyDescription: Optional[str] = None

275

timestamp: str

276

```

277

278

## Usage Examples

279

280

### Basic Policy Management Setup

281

282

```python

283

from policy_manager import DefaultPolicyManager

284

from policy_manager.policy.policy import Policy, AlertOptions

285

import json

286

import os

287

288

# Initialize policy manager (singleton)

289

policy_manager = DefaultPolicyManager.getInstance()

290

291

# Get available policies

292

available_policies = policy_manager.policies

293

print(f"Loaded {len(available_policies)} policies:")

294

for policy_id, policy in available_policies.items():

295

print(f" - {policy_id}: {policy.description}")

296

297

# Get specific policy

298

try:

299

security_policy = policy_manager.getPolicy("data_security_policy")

300

print(f"Security policy loaded: {security_policy.identifier}")

301

print(f"Rules count: {len(security_policy.rules)}")

302

print(f"Alert setting: {security_policy.alertOptions}")

303

except Exception as e:

304

print(f"Policy not found: {e}")

305

```

306

307

### JSON Policy Configuration

308

309

```python

310

from policy_manager import DefaultPolicyManager

311

from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions

312

from policy_manager.policy.json.policy_input import PolicyInput, PolicyRuleInput

313

import json

314

import tempfile

315

import os

316

317

def create_sample_policy_configuration():

318

"""Create sample JSON policy configuration"""

319

320

# Define policy configuration

321

policy_config = {

322

"identifier": "data_quality_policy",

323

"description": "Ensures data quality standards across all datasets",

324

"shouldSendAlert": "ON_DETECTION",

325

"targets": [

326

{

327

"retrieve_url": "s3://data-lake/raw-data/",

328

"type": "data_source"

329

},

330

{

331

"retrieve_url": "s3://data-lake/processed-data/",

332

"type": "processed_data"

333

}

334

],

335

"rules": [

336

{

337

"className": "com.example.rules.CompletenessRule",

338

"configurations": {

339

"minimum_completeness": 0.95,

340

"required_fields": ["id", "timestamp", "value"]

341

},

342

"targetConfigurations": {

343

"batch_size": 10000,

344

"sampling_rate": 0.1

345

}

346

},

347

{

348

"className": "com.example.rules.AccuracyRule",

349

"configurations": {

350

"accuracy_threshold": 0.98,

351

"validation_method": "statistical"

352

}

353

}

354

]

355

}

356

357

# Create temporary policy file

358

with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:

359

json.dump(policy_config, f, indent=2)

360

return f.name

361

362

def load_custom_policies():

363

"""Load custom policies from JSON configuration"""

364

365

# Create sample configuration

366

policy_file = create_sample_policy_configuration()

367

368

try:

369

# Get policy manager

370

manager = DefaultPolicyManager.getInstance()

371

372

# Load the custom policy file

373

manager.loadJsonFile(os.path.dirname(policy_file), os.path.basename(policy_file))

374

375

# Retrieve the loaded policy

376

policy = manager.getPolicy("data_quality_policy")

377

378

print(f"Loaded policy: {policy.identifier}")

379

print(f"Description: {policy.description}")

380

print(f"Alert options: {policy.alertOptions}")

381

print(f"Number of targets: {len(policy.targets)}")

382

print(f"Number of rules: {len(policy.rules)}")

383

384

# Print rule details

385

for i, rule in enumerate(policy.rules):

386

print(f"\nRule {i+1}:")

387

print(f" Class: {rule.className}")

388

print(f" Configurations: {rule.configurations}")

389

if rule.configuredTargets:

390

print(f" Target configurations: {len(rule.configuredTargets)}")

391

392

return policy

393

394

finally:

395

# Clean up temporary file

396

if os.path.exists(policy_file):

397

os.unlink(policy_file)

398

399

# Usage example

400

custom_policy = load_custom_policies()

401

```

402

403

### Policy Execution Framework

404

405

```python

406

from policy_manager import DefaultPolicyManager

407

from policy_manager.policy.policy import Policy, ConfiguredRule

408

from policy_manager.policy.result.policy_invocation_result import PolicyInvocationResult

409

from datetime import datetime

410

from typing import Dict, List, Any

411

import logging

412

413

class PolicyExecutionEngine:

414

"""Engine for executing policies and tracking results"""

415

416

def __init__(self):

417

self.policy_manager = DefaultPolicyManager.getInstance()

418

self.execution_results = []

419

self.logger = logging.getLogger(__name__)

420

421

def execute_policy(self, policy_identifier: str, target_data: Any, context: Dict[str, Any] = None) -> PolicyInvocationResult:

422

"""Execute a specific policy against target data"""

423

424

try:

425

# Get policy configuration

426

policy = self.policy_manager.getPolicy(policy_identifier)

427

428

# Create invocation result

429

result = PolicyInvocationResult(

430

policyName=policy.identifier,

431

policyDescription=policy.description,

432

timestamp=datetime.now().isoformat()

433

)

434

435

print(f"Executing policy: {policy.identifier}")

436

437

# Execute each rule in the policy

438

rule_results = []

439

for rule in policy.rules:

440

rule_result = self._execute_rule(rule, target_data, policy.targets, context)

441

rule_results.append(rule_result)

442

443

print(f" Rule {rule.className}: {'PASSED' if rule_result['passed'] else 'FAILED'}")

444

if not rule_result['passed']:

445

print(f" Reason: {rule_result.get('reason', 'Unknown')}")

446

447

# Determine overall policy result

448

policy_passed = all(result['passed'] for result in rule_results)

449

450

# Handle alerting based on policy configuration

451

if not policy_passed and policy.alertOptions != AlertOptions.NEVER:

452

self._send_alert(policy, rule_results, context)

453

elif policy.alertOptions == AlertOptions.ALWAYS:

454

self._send_alert(policy, rule_results, context)

455

456

# Store result

457

result.rule_results = rule_results # Add rule results to invocation result

458

result.overall_status = "PASSED" if policy_passed else "FAILED"

459

self.execution_results.append(result)

460

461

return result

462

463

except Exception as e:

464

self.logger.error(f"Policy execution failed: {e}")

465

raise

466

467

def _execute_rule(self, rule: ConfiguredRule, target_data: Any, targets: List, context: Dict[str, Any]) -> Dict[str, Any]:

468

"""Execute individual rule (this would integrate with actual rule implementations)"""

469

470

# This is a simplified rule execution simulation

471

# In practice, this would dynamically load and execute the rule class

472

473

rule_result = {

474

'rule_class': rule.className,

475

'passed': True,

476

'details': {},

477

'execution_time': datetime.now().isoformat()

478

}

479

480

# Simulate rule-specific logic based on class name

481

if "CompletenessRule" in rule.className:

482

rule_result.update(self._execute_completeness_rule(rule, target_data))

483

elif "AccuracyRule" in rule.className:

484

rule_result.update(self._execute_accuracy_rule(rule, target_data))

485

elif "DataFreshnessRule" in rule.className:

486

rule_result.update(self._execute_freshness_rule(rule, target_data))

487

else:

488

# Generic rule execution

489

rule_result['details']['message'] = f"Executed {rule.className} successfully"

490

491

return rule_result

492

493

def _execute_completeness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:

494

"""Simulate completeness rule execution"""

495

config = rule.configurations or {}

496

min_completeness = config.get('minimum_completeness', 0.95)

497

498

# Simulate completeness check (in practice, this would analyze actual data)

499

simulated_completeness = 0.97 # Assume 97% completeness

500

501

passed = simulated_completeness >= min_completeness

502

return {

503

'passed': passed,

504

'details': {

505

'completeness_score': simulated_completeness,

506

'threshold': min_completeness,

507

'message': f"Data completeness: {simulated_completeness:.2%}"

508

},

509

'reason': None if passed else f"Completeness {simulated_completeness:.2%} below threshold {min_completeness:.2%}"

510

}

511

512

def _execute_accuracy_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:

513

"""Simulate accuracy rule execution"""

514

config = rule.configurations or {}

515

accuracy_threshold = config.get('accuracy_threshold', 0.98)

516

517

# Simulate accuracy check

518

simulated_accuracy = 0.96 # Assume 96% accuracy

519

520

passed = simulated_accuracy >= accuracy_threshold

521

return {

522

'passed': passed,

523

'details': {

524

'accuracy_score': simulated_accuracy,

525

'threshold': accuracy_threshold,

526

'validation_method': config.get('validation_method', 'unknown')

527

},

528

'reason': None if passed else f"Accuracy {simulated_accuracy:.2%} below threshold {accuracy_threshold:.2%}"

529

}

530

531

def _execute_freshness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:

532

"""Simulate data freshness rule execution"""

533

config = rule.configurations or {}

534

max_age_hours = config.get('max_age_hours', 24)

535

536

# Simulate freshness check

537

simulated_age_hours = 30 # Assume data is 30 hours old

538

539

passed = simulated_age_hours <= max_age_hours

540

return {

541

'passed': passed,

542

'details': {

543

'data_age_hours': simulated_age_hours,

544

'max_allowed_hours': max_age_hours

545

},

546

'reason': None if passed else f"Data age {simulated_age_hours}h exceeds limit {max_age_hours}h"

547

}

548

549

def _send_alert(self, policy: Policy, rule_results: List[Dict], context: Dict[str, Any]):

550

"""Send policy violation alert"""

551

failed_rules = [r for r in rule_results if not r['passed']]

552

553

alert_message = f"Policy Violation: {policy.identifier}\n"

554

alert_message += f"Description: {policy.description}\n"

555

alert_message += f"Failed Rules: {len(failed_rules)}/{len(rule_results)}\n\n"

556

557

for failed_rule in failed_rules:

558

alert_message += f"- {failed_rule['rule_class']}: {failed_rule['reason']}\n"

559

560

print(f"🚨 POLICY ALERT 🚨")

561

print(alert_message)

562

563

# In practice, this would integrate with alerting systems

564

# (email, Slack, PagerDuty, etc.)

565

566

def get_execution_summary(self) -> Dict[str, Any]:

567

"""Get summary of all policy executions"""

568

total_executions = len(self.execution_results)

569

passed_executions = sum(1 for result in self.execution_results if getattr(result, 'overall_status', 'FAILED') == 'PASSED')

570

571

return {

572

'total_executions': total_executions,

573

'passed_executions': passed_executions,

574

'failed_executions': total_executions - passed_executions,

575

'success_rate': (passed_executions / total_executions * 100) if total_executions > 0 else 0

576

}

577

578

# Usage example

579

execution_engine = PolicyExecutionEngine()

580

581

# Execute policies against sample data

582

sample_data = {

583

'dataset_id': 'customer_transactions_2024',

584

'record_count': 1500000,

585

'last_updated': '2024-09-05T10:30:00Z'

586

}

587

588

# Execute different policies

589

try:

590

# Data quality policy

591

quality_result = execution_engine.execute_policy(

592

policy_identifier='data_quality_policy',

593

target_data=sample_data,

594

context={'environment': 'production', 'pipeline_id': 'etl_001'}

595

)

596

597

print(f"Quality policy result: {getattr(quality_result, 'overall_status', 'Unknown')}")

598

599

# Get execution summary

600

summary = execution_engine.get_execution_summary()

601

print(f"\nExecution Summary:")

602

print(f" Total executions: {summary['total_executions']}")

603

print(f" Success rate: {summary['success_rate']:.1f}%")

604

605

except Exception as e:

606

print(f"Policy execution failed: {e}")

607

```

608

609

### Dynamic Policy Configuration Management

610

611

```python

612

from policy_manager import DefaultPolicyManager

613

from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions

614

from policy_manager.configuration.policy_configuration import PolicyConfiguration

615

import json

616

import os

617

from typing import Dict, List

618

from datetime import datetime

619

620

class DynamicPolicyManager:

621

"""Advanced policy management with dynamic configuration updates"""

622

623

def __init__(self):

624

self.base_manager = DefaultPolicyManager.getInstance()

625

self.config = PolicyConfiguration()

626

self.policy_cache = {}

627

self.change_listeners = []

628

629

def register_change_listener(self, callback):

630

"""Register callback for policy configuration changes"""

631

self.change_listeners.append(callback)

632

633

def notify_change_listeners(self, policy_id: str, change_type: str):

634

"""Notify registered listeners of policy changes"""

635

for callback in self.change_listeners:

636

try:

637

callback(policy_id, change_type)

638

except Exception as e:

639

print(f"Error notifying change listener: {e}")

640

641

def create_policy_runtime(self, policy_config: Dict) -> Policy:

642

"""Create policy dynamically from configuration dictionary"""

643

644

# Create targets

645

targets = []

646

if 'targets' in policy_config:

647

for target_config in policy_config['targets']:

648

target = Target(

649

retrieve_url=target_config.get('retrieve_url'),

650

type=target_config.get('type')

651

)

652

targets.append(target)

653

654

# Create rules

655

rules = []

656

if 'rules' in policy_config:

657

for rule_config in policy_config['rules']:

658

rule = ConfiguredRule(

659

className=rule_config['className'],

660

configurations=rule_config.get('configurations'),

661

configuredTargets=[] # Would be populated based on targetConfigurations

662

)

663

rules.append(rule)

664

665

# Create policy

666

policy = Policy(

667

identifier=policy_config['identifier'],

668

description=policy_config.get('description'),

669

alertOptions=AlertOptions(policy_config.get('shouldSendAlert', 'ON_DETECTION')),

670

targets=targets,

671

rules=rules

672

)

673

674

return policy

675

676

def update_policy_configuration(self, policy_id: str, updates: Dict):

677

"""Update existing policy configuration"""

678

try:

679

# Get existing policy

680

existing_policy = self.base_manager.getPolicy(policy_id)

681

682

# Create updated configuration

683

updated_config = {

684

'identifier': existing_policy.identifier,

685

'description': existing_policy.description,

686

'shouldSendAlert': existing_policy.alertOptions.value,

687

'targets': [

688

{'retrieve_url': t.retrieve_url, 'type': t.type}

689

for t in existing_policy.targets

690

] if existing_policy.targets else [],

691

'rules': [

692

{

693

'className': r.className,

694

'configurations': r.configurations,

695

'targetConfigurations': {} # Simplified

696

}

697

for r in existing_policy.rules

698

]

699

}

700

701

# Apply updates

702

updated_config.update(updates)

703

704

# Create new policy

705

new_policy = self.create_policy_runtime(updated_config)

706

707

# Update in manager (simplified - would need proper integration)

708

self.policy_cache[policy_id] = new_policy

709

710

# Notify listeners

711

self.notify_change_listeners(policy_id, 'UPDATED')

712

713

print(f"Policy {policy_id} updated successfully")

714

return new_policy

715

716

except Exception as e:

717

print(f"Failed to update policy {policy_id}: {e}")

718

raise

719

720

def create_conditional_policy(self, base_policy_id: str, conditions: Dict) -> str:

721

"""Create conditional policy based on existing policy"""

722

723

base_policy = self.base_manager.getPolicy(base_policy_id)

724

725

# Generate conditional policy ID

726

condition_hash = hash(json.dumps(conditions, sort_keys=True))

727

conditional_id = f"{base_policy_id}_conditional_{abs(condition_hash)}"

728

729

# Create conditional rules

730

conditional_rules = []

731

for rule in base_policy.rules:

732

# Add conditions to rule configuration

733

enhanced_config = rule.configurations.copy() if rule.configurations else {}

734

enhanced_config['conditions'] = conditions

735

736

conditional_rule = ConfiguredRule(

737

className=rule.className,

738

configurations=enhanced_config,

739

configuredTargets=rule.configuredTargets

740

)

741

conditional_rules.append(conditional_rule)

742

743

# Create conditional policy

744

conditional_policy = Policy(

745

identifier=conditional_id,

746

description=f"Conditional variant of {base_policy.description}",

747

alertOptions=base_policy.alertOptions,

748

targets=base_policy.targets,

749

rules=conditional_rules

750

)

751

752

# Store in cache

753

self.policy_cache[conditional_id] = conditional_policy

754

755

# Notify listeners

756

self.notify_change_listeners(conditional_id, 'CREATED')

757

758

return conditional_id

759

760

def get_policy_with_fallback(self, policy_id: str) -> Policy:

761

"""Get policy with cache fallback"""

762

763

# Try cache first

764

if policy_id in self.policy_cache:

765

return self.policy_cache[policy_id]

766

767

# Fallback to base manager

768

try:

769

return self.base_manager.getPolicy(policy_id)

770

except Exception:

771

# Create default policy if not found

772

return self._create_default_policy(policy_id)

773

774

def _create_default_policy(self, policy_id: str) -> Policy:

775

"""Create default policy for unknown identifiers"""

776

777

default_policy = Policy(

778

identifier=policy_id,

779

description=f"Default policy for {policy_id}",

780

alertOptions=AlertOptions.NEVER,

781

targets=[],

782

rules=[]

783

)

784

785

self.policy_cache[policy_id] = default_policy

786

return default_policy

787

788

def export_policies_to_json(self, policy_ids: List[str] = None) -> str:

789

"""Export policies to JSON configuration"""

790

791

policies_to_export = policy_ids or list(self.base_manager.policies.keys())

792

793

export_data = []

794

for policy_id in policies_to_export:

795

try:

796

policy = self.get_policy_with_fallback(policy_id)

797

798

policy_data = {

799

'identifier': policy.identifier,

800

'description': policy.description,

801

'shouldSendAlert': policy.alertOptions.value,

802

'targets': [

803

{

804

'retrieve_url': target.retrieve_url,

805

'type': target.type

806

}

807

for target in (policy.targets or [])

808

],

809

'rules': [

810

{

811

'className': rule.className,

812

'configurations': rule.configurations or {},

813

'targetConfigurations': {} # Simplified

814

}

815

for rule in policy.rules

816

]

817

}

818

819

export_data.append(policy_data)

820

821

except Exception as e:

822

print(f"Error exporting policy {policy_id}: {e}")

823

824

return json.dumps(export_data, indent=2)

825

826

# Usage example

827

def policy_management_demo():

828

"""Demonstrate dynamic policy management capabilities"""

829

830

dynamic_manager = DynamicPolicyManager()

831

832

# Register change listener

833

def policy_change_handler(policy_id: str, change_type: str):

834

print(f"πŸ“‹ Policy Change: {policy_id} - {change_type}")

835

836

dynamic_manager.register_change_listener(policy_change_handler)

837

838

# Create new policy at runtime

839

new_policy_config = {

840

'identifier': 'dynamic_security_policy',

841

'description': 'Runtime created security policy',

842

'shouldSendAlert': 'ALWAYS',

843

'targets': [

844

{'retrieve_url': 'https://api.security.com/scan', 'type': 'security_endpoint'}

845

],

846

'rules': [

847

{

848

'className': 'com.example.rules.ThreatDetectionRule',

849

'configurations': {

850

'sensitivity': 'high',

851

'scan_types': ['malware', 'intrusion', 'anomaly']

852

}

853

}

854

]

855

}

856

857

new_policy = dynamic_manager.create_policy_runtime(new_policy_config)

858

print(f"Created policy: {new_policy.identifier}")

859

860

# Update existing policy

861

updates = {

862

'description': 'Updated security policy with enhanced monitoring',

863

'rules': [

864

{

865

'className': 'com.example.rules.EnhancedThreatDetectionRule',

866

'configurations': {

867

'sensitivity': 'maximum',

868

'scan_types': ['malware', 'intrusion', 'anomaly', 'behavioral'],

869

'real_time_monitoring': True

870

}

871

}

872

]

873

}

874

875

try:

876

updated_policy = dynamic_manager.update_policy_configuration('dynamic_security_policy', updates)

877

print(f"Updated policy: {updated_policy.identifier}")

878

except Exception as e:

879

print(f"Update failed: {e}")

880

881

# Create conditional policy

882

conditions = {

883

'environment': 'production',

884

'data_classification': 'sensitive',

885

'time_window': '09:00-17:00'

886

}

887

888

conditional_id = dynamic_manager.create_conditional_policy('dynamic_security_policy', conditions)

889

print(f"Created conditional policy: {conditional_id}")

890

891

# Export policies

892

exported_config = dynamic_manager.export_policies_to_json(['dynamic_security_policy', conditional_id])

893

print("Exported policy configuration:")

894

print(exported_config[:200] + "..." if len(exported_config) > 200 else exported_config)

895

896

# Run the demonstration

897

policy_management_demo()

898

```

899

900

## Best Practices

901

902

### Policy Design

903

- Use clear, descriptive policy identifiers and descriptions

904

- Design policies to be composable and reusable

905

- Implement proper validation for policy configurations

906

- Use appropriate alert options based on business requirements

907

908

### Configuration Management

909

- Store policy configurations in version control

910

- Use environment-specific policy configurations

911

- Implement policy configuration validation

912

- Regular policy configuration backups

913

914

### Rule Development

915

- Create modular, testable rule implementations

916

- Use dependency injection for rule dependencies

917

- Implement comprehensive error handling in rules

918

- Document rule configuration parameters clearly

919

920

### Performance and Scalability

921

- Cache frequently accessed policies

922

- Implement efficient policy loading strategies

923

- Monitor policy execution performance

924

- Use asynchronous execution for complex rule evaluations

925

926

### Governance and Compliance

927

- Implement policy change approval workflows

928

- Maintain audit trails for policy executions

929

- Regular policy effectiveness reviews

930

- Compliance reporting and documentation