CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Pending
Overview
Eval results
Files

policy.mddocs/

Policy Management

Policy-based configuration and governance system with JSON-defined rules, configurable targets, and extensible rule evaluation framework. The policy management system supports complex business logic and compliance requirements with flexible alert configurations and comprehensive policy lifecycle management.

Capabilities

Abstract Policy Manager

Core policy management framework providing JSON-based configuration loading, policy validation, and rule configuration with extensible architecture for custom policy implementations.

class AbstractPolicyManager(ABC):
    """
    Abstract class containing logic for reading and parsing policies and rules from JSON configuration.
    
    Class Attributes:
    - __logger - LogManager instance for AbstractPolicyManager
    - __policyLocation = "POLICY_LOCATION" - Environment variable name for policy location
    
    Properties:
    - policies: Dict[str, Policy] - Returns policies dictionary
    """
    
    def __init__(self) -> None:
        """Initialize policy manager and load configurations"""
        ...
    
    def getPoliciesLocation(self) -> str:
        """Get policies location from config or environment"""
        ...
    
    def loadPolicyConfigurations(self, policiesLocation: str) -> None:
        """Load policy configurations from JSON files"""
        ...
    
    def loadJsonFilesInDirectory(self, policyConfigurationsLocation: str) -> None:
        """Load all JSON files in directory"""
        ...
    
    def loadJsonFile(self, filePath: str, fileName: str) -> None:
        """Load individual JSON file"""
        ...
    
    def validateAndAddPolicies(self, policies: List[PolicyInput]) -> None:
        """Validate and add list of policies"""
        ...
    
    def validateAndAddPolicy(self, policyInput: PolicyInput) -> None:
        """Validate and add single policy"""
        ...
    
    def validateAndConfigureRule(self, ruleInput: PolicyRuleInput, targets: List[Target]) -> ConfiguredRule:
        """Validate and configure policy rule"""
        ...
    
    def getPolicy(self, policyIdentifier: str) -> Policy:
        """Get policy by identifier"""
        ...
    
    def getDeserializationClass(self) -> any:
        """Get class for JSON deserialization (can be overridden)"""
        ...
    
    def createPolicy(self, policyIdentifier: str) -> Policy:
        """Create policy instance (can be overridden)"""
        ...
    
    def setAdditionalConfigurations(self, policy: Policy, input: PolicyInput) -> None:
        """Set additional configurations (can be overridden)"""
        ...

Default Policy Manager

Singleton implementation of the policy management framework providing centralized policy access and configuration management across application components.

class DefaultPolicyManager(AbstractPolicyManager):
    """
    Default singleton policy manager implementation.
    
    Class Attributes:
    - __instance = None - Singleton instance
    """
    
    def __init__(self) -> None:
        """Initialize singleton instance"""
        ...
    
    @staticmethod
    def getInstance() -> DefaultPolicyManager:
        """Get singleton instance"""
        ...

Policy Configuration

Configuration management for policy system settings including policy location and default values with property-based configuration support.

class PolicyConfiguration:
    """
    Used to configure policy location and defaults.
    """
    
    def __init__(self) -> None:
        """Initialize with policy-configuration.properties"""
        ...
    
    def policiesLocation(self) -> str:
        """Returns policies location from configuration"""
        ...

Policy Data Models

Comprehensive data models for policy definition including alert options, targets, rules, and extensible configuration support with deprecated field handling for backward compatibility.

class AlertOptions(Enum):
    """
    Enum to determine when alerts will be sent for policies.
    
    Values:
    - ALWAYS = "ALWAYS"
    - ON_DETECTION = "ON_DETECTION"  
    - NEVER = "NEVER"
    """
    ALWAYS = "ALWAYS"
    ON_DETECTION = "ON_DETECTION"
    NEVER = "NEVER"

class Target(BaseModel):
    """
    Contains target information for the policy.
    
    Attributes:
    - retrieve_url: Optional[str] = None - Target retrieval URL
    - type: Optional[str] = None - Target type
    """
    retrieve_url: Optional[str] = None
    type: Optional[str] = None

class ConfiguredTarget(Target):
    """
    Contains target information with configurations needed by the rule.
    
    Attributes:
    - target_configurations: Dict[str, Any] - Target configurations dictionary
    - Inherits: retrieve_url and type from Target
    """
    target_configurations: Dict[str, Any]

class ConfiguredRule(BaseModel):
    """
    Represents a rule read by the policy manager with class and configuration information.
    
    Class Attributes:
    - __logger - LogManager instance for ConfiguredRule
    - __deprecated_set_methods - Dictionary for deprecated method mapping
    
    Attributes:
    - className: str - Rule class name
    - configurations: Optional[Dict[str, Any]] = None - Rule configurations
    - configuredTargets: Optional[List[ConfiguredTarget]] = [] - List of configured targets
    
    Properties:
    - targetConfigurations: ConfiguredTarget - Deprecated property (use configuredTargets)
    """
    className: str
    configurations: Optional[Dict[str, Any]] = None
    configuredTargets: Optional[List[ConfiguredTarget]] = []
    
    def set_deprecated_targetConfigurations(self, new_value: ConfiguredTarget):
        """Deprecated setter method"""
        ...
    
    def __setattr__(self, key, val):
        """Custom attribute setter for deprecated attributes"""
        ...

class Policy(BaseModel):
    """
    Maps a rule or set of rules with identifier for policy execution.
    
    Class Attributes:
    - __logger - LogManager instance for Policy
    - __deprecated_set_methods - Dictionary for deprecated method mapping
    
    Attributes:
    - alertOptions: AlertOptions = AlertOptions.ON_DETECTION - Alert options (default: ON_DETECTION)
    - identifier: str - Policy identifier
    - description: Optional[str] = None - Policy description
    - targets: Optional[List[Target]] = [] - List of targets
    - rules: List[ConfiguredRule] = [] - List of configured rules
    
    Properties:
    - target: Target - Deprecated property (use targets)
    """
    alertOptions: AlertOptions = AlertOptions.ON_DETECTION
    identifier: str
    description: Optional[str] = None
    targets: Optional[List[Target]] = []
    rules: List[ConfiguredRule] = []
    
    def set_deprecated_target(self, new_value: Target):
        """Deprecated setter method"""
        ...
    
    def __setattr__(self, key, val):
        """Custom attribute setter for deprecated attributes"""
        ...

Policy Input Models

JSON deserialization models for loading policy configurations from files with support for both current and deprecated field structures for backward compatibility.

class PolicyRuleInput(BaseModel):
    """
    Represents policy rule data read from JSON file.
    
    Attributes:
    - className: str - Class name for the rule (required)
    - configurations: Optional[Dict[str, Any]] = None - Rule configuration
    - targetConfigurations: Optional[Dict[str, Any]] = None - Target configurations
    """
    className: str
    configurations: Optional[Dict[str, Any]] = None
    targetConfigurations: Optional[Dict[str, Any]] = None

class PolicyInput(BaseModel):
    """
    Represents policy information read from JSON file.
    
    Class Attributes:
    - __logger - LogManager instance for PolicyInput
    
    Attributes:
    - identifier: str - Policy identifier (required)
    - description: Optional[str] = None - Policy description
    - targets: Optional[List[Target]] = None - List of targets
    - shouldSendAlert: Optional[AlertOptions] = None - Alert configuration
    - rules: List[PolicyRuleInput] = [] - List of policy rules
    - target: Optional[Target] = None - Deprecated single target
    """
    identifier: str
    description: Optional[str] = None
    targets: Optional[List[Target]] = None
    shouldSendAlert: Optional[AlertOptions] = None
    rules: List[PolicyRuleInput] = []
    target: Optional[Target] = None
    
    def getAnyTargets(self) -> List[Target]:
        """Returns targets list, checking both new and deprecated attributes"""
        ...

Policy Invocation Results

Result tracking for policy execution with comprehensive metadata including policy identification, description, and execution timestamps for audit and monitoring purposes.

class PolicyInvocationResult(BaseModel):
    """
    Represents the results of a policy invocation.
    
    Attributes:
    - policyName: str - Name of the policy
    - policyDescription: Optional[str] = None - Policy description
    - timestamp: str - Invocation timestamp
    """
    policyName: str
    policyDescription: Optional[str] = None
    timestamp: str

Usage Examples

Basic Policy Management Setup

from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, AlertOptions
import json
import os

# Initialize policy manager (singleton)
policy_manager = DefaultPolicyManager.getInstance()

# Get available policies
available_policies = policy_manager.policies
print(f"Loaded {len(available_policies)} policies:")
for policy_id, policy in available_policies.items():
    print(f"  - {policy_id}: {policy.description}")

# Get specific policy
try:
    security_policy = policy_manager.getPolicy("data_security_policy")
    print(f"Security policy loaded: {security_policy.identifier}")
    print(f"Rules count: {len(security_policy.rules)}")
    print(f"Alert setting: {security_policy.alertOptions}")
except Exception as e:
    print(f"Policy not found: {e}")

JSON Policy Configuration

from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions
from policy_manager.policy.json.policy_input import PolicyInput, PolicyRuleInput
import json
import tempfile
import os

def create_sample_policy_configuration():
    """Create sample JSON policy configuration"""
    
    # Define policy configuration
    policy_config = {
        "identifier": "data_quality_policy",
        "description": "Ensures data quality standards across all datasets",
        "shouldSendAlert": "ON_DETECTION",
        "targets": [
            {
                "retrieve_url": "s3://data-lake/raw-data/",
                "type": "data_source"
            },
            {
                "retrieve_url": "s3://data-lake/processed-data/",
                "type": "processed_data"
            }
        ],
        "rules": [
            {
                "className": "com.example.rules.CompletenessRule",
                "configurations": {
                    "minimum_completeness": 0.95,
                    "required_fields": ["id", "timestamp", "value"]
                },
                "targetConfigurations": {
                    "batch_size": 10000,
                    "sampling_rate": 0.1
                }
            },
            {
                "className": "com.example.rules.AccuracyRule", 
                "configurations": {
                    "accuracy_threshold": 0.98,
                    "validation_method": "statistical"
                }
            }
        ]
    }
    
    # Create temporary policy file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
        json.dump(policy_config, f, indent=2)
        return f.name

def load_custom_policies():
    """Load custom policies from JSON configuration"""
    
    # Create sample configuration
    policy_file = create_sample_policy_configuration()
    
    try:
        # Get policy manager
        manager = DefaultPolicyManager.getInstance()
        
        # Load the custom policy file
        manager.loadJsonFile(os.path.dirname(policy_file), os.path.basename(policy_file))
        
        # Retrieve the loaded policy
        policy = manager.getPolicy("data_quality_policy")
        
        print(f"Loaded policy: {policy.identifier}")
        print(f"Description: {policy.description}")
        print(f"Alert options: {policy.alertOptions}")
        print(f"Number of targets: {len(policy.targets)}")
        print(f"Number of rules: {len(policy.rules)}")
        
        # Print rule details
        for i, rule in enumerate(policy.rules):
            print(f"\nRule {i+1}:")
            print(f"  Class: {rule.className}")
            print(f"  Configurations: {rule.configurations}")
            if rule.configuredTargets:
                print(f"  Target configurations: {len(rule.configuredTargets)}")
        
        return policy
        
    finally:
        # Clean up temporary file
        if os.path.exists(policy_file):
            os.unlink(policy_file)

# Usage example
custom_policy = load_custom_policies()

Policy Execution Framework

from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, ConfiguredRule
from policy_manager.policy.result.policy_invocation_result import PolicyInvocationResult
from datetime import datetime
from typing import Dict, List, Any
import logging

class PolicyExecutionEngine:
    """Engine for executing policies and tracking results"""
    
    def __init__(self):
        self.policy_manager = DefaultPolicyManager.getInstance()
        self.execution_results = []
        self.logger = logging.getLogger(__name__)
    
    def execute_policy(self, policy_identifier: str, target_data: Any, context: Dict[str, Any] = None) -> PolicyInvocationResult:
        """Execute a specific policy against target data"""
        
        try:
            # Get policy configuration
            policy = self.policy_manager.getPolicy(policy_identifier)
            
            # Create invocation result
            result = PolicyInvocationResult(
                policyName=policy.identifier,
                policyDescription=policy.description,
                timestamp=datetime.now().isoformat()
            )
            
            print(f"Executing policy: {policy.identifier}")
            
            # Execute each rule in the policy
            rule_results = []
            for rule in policy.rules:
                rule_result = self._execute_rule(rule, target_data, policy.targets, context)
                rule_results.append(rule_result)
                
                print(f"  Rule {rule.className}: {'PASSED' if rule_result['passed'] else 'FAILED'}")
                if not rule_result['passed']:
                    print(f"    Reason: {rule_result.get('reason', 'Unknown')}")
            
            # Determine overall policy result
            policy_passed = all(result['passed'] for result in rule_results)
            
            # Handle alerting based on policy configuration
            if not policy_passed and policy.alertOptions != AlertOptions.NEVER:
                self._send_alert(policy, rule_results, context)
            elif policy.alertOptions == AlertOptions.ALWAYS:
                self._send_alert(policy, rule_results, context)
            
            # Store result
            result.rule_results = rule_results  # Add rule results to invocation result
            result.overall_status = "PASSED" if policy_passed else "FAILED"
            self.execution_results.append(result)
            
            return result
            
        except Exception as e:
            self.logger.error(f"Policy execution failed: {e}")
            raise
    
    def _execute_rule(self, rule: ConfiguredRule, target_data: Any, targets: List, context: Dict[str, Any]) -> Dict[str, Any]:
        """Execute individual rule (this would integrate with actual rule implementations)"""
        
        # This is a simplified rule execution simulation
        # In practice, this would dynamically load and execute the rule class
        
        rule_result = {
            'rule_class': rule.className,
            'passed': True,
            'details': {},
            'execution_time': datetime.now().isoformat()
        }
        
        # Simulate rule-specific logic based on class name
        if "CompletenessRule" in rule.className:
            rule_result.update(self._execute_completeness_rule(rule, target_data))
        elif "AccuracyRule" in rule.className:
            rule_result.update(self._execute_accuracy_rule(rule, target_data))
        elif "DataFreshnessRule" in rule.className:
            rule_result.update(self._execute_freshness_rule(rule, target_data))
        else:
            # Generic rule execution
            rule_result['details']['message'] = f"Executed {rule.className} successfully"
        
        return rule_result
    
    def _execute_completeness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
        """Simulate completeness rule execution"""
        config = rule.configurations or {}
        min_completeness = config.get('minimum_completeness', 0.95)
        
        # Simulate completeness check (in practice, this would analyze actual data)
        simulated_completeness = 0.97  # Assume 97% completeness
        
        passed = simulated_completeness >= min_completeness
        return {
            'passed': passed,
            'details': {
                'completeness_score': simulated_completeness,
                'threshold': min_completeness,
                'message': f"Data completeness: {simulated_completeness:.2%}"
            },
            'reason': None if passed else f"Completeness {simulated_completeness:.2%} below threshold {min_completeness:.2%}"
        }
    
    def _execute_accuracy_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
        """Simulate accuracy rule execution"""
        config = rule.configurations or {}
        accuracy_threshold = config.get('accuracy_threshold', 0.98)
        
        # Simulate accuracy check
        simulated_accuracy = 0.96  # Assume 96% accuracy
        
        passed = simulated_accuracy >= accuracy_threshold
        return {
            'passed': passed,
            'details': {
                'accuracy_score': simulated_accuracy,
                'threshold': accuracy_threshold,
                'validation_method': config.get('validation_method', 'unknown')
            },
            'reason': None if passed else f"Accuracy {simulated_accuracy:.2%} below threshold {accuracy_threshold:.2%}"
        }
    
    def _execute_freshness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
        """Simulate data freshness rule execution"""
        config = rule.configurations or {}
        max_age_hours = config.get('max_age_hours', 24)
        
        # Simulate freshness check
        simulated_age_hours = 30  # Assume data is 30 hours old
        
        passed = simulated_age_hours <= max_age_hours
        return {
            'passed': passed,
            'details': {
                'data_age_hours': simulated_age_hours,
                'max_allowed_hours': max_age_hours
            },
            'reason': None if passed else f"Data age {simulated_age_hours}h exceeds limit {max_age_hours}h"
        }
    
    def _send_alert(self, policy: Policy, rule_results: List[Dict], context: Dict[str, Any]):
        """Send policy violation alert"""
        failed_rules = [r for r in rule_results if not r['passed']]
        
        alert_message = f"Policy Violation: {policy.identifier}\n"
        alert_message += f"Description: {policy.description}\n"
        alert_message += f"Failed Rules: {len(failed_rules)}/{len(rule_results)}\n\n"
        
        for failed_rule in failed_rules:
            alert_message += f"- {failed_rule['rule_class']}: {failed_rule['reason']}\n"
        
        print(f"🚨 POLICY ALERT 🚨")
        print(alert_message)
        
        # In practice, this would integrate with alerting systems
        # (email, Slack, PagerDuty, etc.)
    
    def get_execution_summary(self) -> Dict[str, Any]:
        """Get summary of all policy executions"""
        total_executions = len(self.execution_results)
        passed_executions = sum(1 for result in self.execution_results if getattr(result, 'overall_status', 'FAILED') == 'PASSED')
        
        return {
            'total_executions': total_executions,
            'passed_executions': passed_executions,
            'failed_executions': total_executions - passed_executions,
            'success_rate': (passed_executions / total_executions * 100) if total_executions > 0 else 0
        }

# Usage example
execution_engine = PolicyExecutionEngine()

# Execute policies against sample data
sample_data = {
    'dataset_id': 'customer_transactions_2024',
    'record_count': 1500000,
    'last_updated': '2024-09-05T10:30:00Z'
}

# Execute different policies
try:
    # Data quality policy
    quality_result = execution_engine.execute_policy(
        policy_identifier='data_quality_policy',
        target_data=sample_data,
        context={'environment': 'production', 'pipeline_id': 'etl_001'}
    )
    
    print(f"Quality policy result: {getattr(quality_result, 'overall_status', 'Unknown')}")
    
    # Get execution summary
    summary = execution_engine.get_execution_summary()
    print(f"\nExecution Summary:")
    print(f"  Total executions: {summary['total_executions']}")
    print(f"  Success rate: {summary['success_rate']:.1f}%")
    
except Exception as e:
    print(f"Policy execution failed: {e}")

Dynamic Policy Configuration Management

from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions
from policy_manager.configuration.policy_configuration import PolicyConfiguration
import json
import os
from typing import Dict, List
from datetime import datetime

class DynamicPolicyManager:
    """Advanced policy management with dynamic configuration updates"""
    
    def __init__(self):
        self.base_manager = DefaultPolicyManager.getInstance()
        self.config = PolicyConfiguration()
        self.policy_cache = {}
        self.change_listeners = []
    
    def register_change_listener(self, callback):
        """Register callback for policy configuration changes"""
        self.change_listeners.append(callback)
    
    def notify_change_listeners(self, policy_id: str, change_type: str):
        """Notify registered listeners of policy changes"""
        for callback in self.change_listeners:
            try:
                callback(policy_id, change_type)
            except Exception as e:
                print(f"Error notifying change listener: {e}")
    
    def create_policy_runtime(self, policy_config: Dict) -> Policy:
        """Create policy dynamically from configuration dictionary"""
        
        # Create targets
        targets = []
        if 'targets' in policy_config:
            for target_config in policy_config['targets']:
                target = Target(
                    retrieve_url=target_config.get('retrieve_url'),
                    type=target_config.get('type')
                )
                targets.append(target)
        
        # Create rules
        rules = []
        if 'rules' in policy_config:
            for rule_config in policy_config['rules']:
                rule = ConfiguredRule(
                    className=rule_config['className'],
                    configurations=rule_config.get('configurations'),
                    configuredTargets=[]  # Would be populated based on targetConfigurations
                )
                rules.append(rule)
        
        # Create policy
        policy = Policy(
            identifier=policy_config['identifier'],
            description=policy_config.get('description'),
            alertOptions=AlertOptions(policy_config.get('shouldSendAlert', 'ON_DETECTION')),
            targets=targets,
            rules=rules
        )
        
        return policy
    
    def update_policy_configuration(self, policy_id: str, updates: Dict):
        """Update existing policy configuration"""
        try:
            # Get existing policy
            existing_policy = self.base_manager.getPolicy(policy_id)
            
            # Create updated configuration
            updated_config = {
                'identifier': existing_policy.identifier,
                'description': existing_policy.description,
                'shouldSendAlert': existing_policy.alertOptions.value,
                'targets': [
                    {'retrieve_url': t.retrieve_url, 'type': t.type} 
                    for t in existing_policy.targets
                ] if existing_policy.targets else [],
                'rules': [
                    {
                        'className': r.className,
                        'configurations': r.configurations,
                        'targetConfigurations': {}  # Simplified
                    }
                    for r in existing_policy.rules
                ]
            }
            
            # Apply updates
            updated_config.update(updates)
            
            # Create new policy
            new_policy = self.create_policy_runtime(updated_config)
            
            # Update in manager (simplified - would need proper integration)
            self.policy_cache[policy_id] = new_policy
            
            # Notify listeners
            self.notify_change_listeners(policy_id, 'UPDATED')
            
            print(f"Policy {policy_id} updated successfully")
            return new_policy
            
        except Exception as e:
            print(f"Failed to update policy {policy_id}: {e}")
            raise
    
    def create_conditional_policy(self, base_policy_id: str, conditions: Dict) -> str:
        """Create conditional policy based on existing policy"""
        
        base_policy = self.base_manager.getPolicy(base_policy_id)
        
        # Generate conditional policy ID
        condition_hash = hash(json.dumps(conditions, sort_keys=True))
        conditional_id = f"{base_policy_id}_conditional_{abs(condition_hash)}"
        
        # Create conditional rules
        conditional_rules = []
        for rule in base_policy.rules:
            # Add conditions to rule configuration
            enhanced_config = rule.configurations.copy() if rule.configurations else {}
            enhanced_config['conditions'] = conditions
            
            conditional_rule = ConfiguredRule(
                className=rule.className,
                configurations=enhanced_config,
                configuredTargets=rule.configuredTargets
            )
            conditional_rules.append(conditional_rule)
        
        # Create conditional policy
        conditional_policy = Policy(
            identifier=conditional_id,
            description=f"Conditional variant of {base_policy.description}",
            alertOptions=base_policy.alertOptions,
            targets=base_policy.targets,
            rules=conditional_rules
        )
        
        # Store in cache
        self.policy_cache[conditional_id] = conditional_policy
        
        # Notify listeners
        self.notify_change_listeners(conditional_id, 'CREATED')
        
        return conditional_id
    
    def get_policy_with_fallback(self, policy_id: str) -> Policy:
        """Get policy with cache fallback"""
        
        # Try cache first
        if policy_id in self.policy_cache:
            return self.policy_cache[policy_id]
        
        # Fallback to base manager
        try:
            return self.base_manager.getPolicy(policy_id)
        except Exception:
            # Create default policy if not found
            return self._create_default_policy(policy_id)
    
    def _create_default_policy(self, policy_id: str) -> Policy:
        """Create default policy for unknown identifiers"""
        
        default_policy = Policy(
            identifier=policy_id,
            description=f"Default policy for {policy_id}",
            alertOptions=AlertOptions.NEVER,
            targets=[],
            rules=[]
        )
        
        self.policy_cache[policy_id] = default_policy
        return default_policy
    
    def export_policies_to_json(self, policy_ids: List[str] = None) -> str:
        """Export policies to JSON configuration"""
        
        policies_to_export = policy_ids or list(self.base_manager.policies.keys())
        
        export_data = []
        for policy_id in policies_to_export:
            try:
                policy = self.get_policy_with_fallback(policy_id)
                
                policy_data = {
                    'identifier': policy.identifier,
                    'description': policy.description,
                    'shouldSendAlert': policy.alertOptions.value,
                    'targets': [
                        {
                            'retrieve_url': target.retrieve_url,
                            'type': target.type
                        }
                        for target in (policy.targets or [])
                    ],
                    'rules': [
                        {
                            'className': rule.className,
                            'configurations': rule.configurations or {},
                            'targetConfigurations': {}  # Simplified
                        }
                        for rule in policy.rules
                    ]
                }
                
                export_data.append(policy_data)
                
            except Exception as e:
                print(f"Error exporting policy {policy_id}: {e}")
        
        return json.dumps(export_data, indent=2)

# Usage example
def policy_management_demo():
    """Demonstrate dynamic policy management capabilities"""
    
    dynamic_manager = DynamicPolicyManager()
    
    # Register change listener
    def policy_change_handler(policy_id: str, change_type: str):
        print(f"📋 Policy Change: {policy_id} - {change_type}")
    
    dynamic_manager.register_change_listener(policy_change_handler)
    
    # Create new policy at runtime
    new_policy_config = {
        'identifier': 'dynamic_security_policy',
        'description': 'Runtime created security policy',
        'shouldSendAlert': 'ALWAYS',
        'targets': [
            {'retrieve_url': 'https://api.security.com/scan', 'type': 'security_endpoint'}
        ],
        'rules': [
            {
                'className': 'com.example.rules.ThreatDetectionRule',
                'configurations': {
                    'sensitivity': 'high',
                    'scan_types': ['malware', 'intrusion', 'anomaly']
                }
            }
        ]
    }
    
    new_policy = dynamic_manager.create_policy_runtime(new_policy_config)
    print(f"Created policy: {new_policy.identifier}")
    
    # Update existing policy
    updates = {
        'description': 'Updated security policy with enhanced monitoring',
        'rules': [
            {
                'className': 'com.example.rules.EnhancedThreatDetectionRule',
                'configurations': {
                    'sensitivity': 'maximum',
                    'scan_types': ['malware', 'intrusion', 'anomaly', 'behavioral'],
                    'real_time_monitoring': True
                }
            }
        ]
    }
    
    try:
        updated_policy = dynamic_manager.update_policy_configuration('dynamic_security_policy', updates)
        print(f"Updated policy: {updated_policy.identifier}")
    except Exception as e:
        print(f"Update failed: {e}")
    
    # Create conditional policy
    conditions = {
        'environment': 'production',
        'data_classification': 'sensitive',
        'time_window': '09:00-17:00'
    }
    
    conditional_id = dynamic_manager.create_conditional_policy('dynamic_security_policy', conditions)
    print(f"Created conditional policy: {conditional_id}")
    
    # Export policies
    exported_config = dynamic_manager.export_policies_to_json(['dynamic_security_policy', conditional_id])
    print("Exported policy configuration:")
    print(exported_config[:200] + "..." if len(exported_config) > 200 else exported_config)

# Run the demonstration
policy_management_demo()

Best Practices

Policy Design

  • Use clear, descriptive policy identifiers and descriptions
  • Design policies to be composable and reusable
  • Implement proper validation for policy configurations
  • Use appropriate alert options based on business requirements

Configuration Management

  • Store policy configurations in version control
  • Use environment-specific policy configurations
  • Implement policy configuration validation
  • Regular policy configuration backups

Rule Development

  • Create modular, testable rule implementations
  • Use dependency injection for rule dependencies
  • Implement comprehensive error handling in rules
  • Document rule configuration parameters clearly

Performance and Scalability

  • Cache frequently accessed policies
  • Implement efficient policy loading strategies
  • Monitor policy execution performance
  • Use asynchronous execution for complex rule evaluations

Governance and Compliance

  • Implement policy change approval workflows
  • Maintain audit trails for policy executions
  • Regular policy effectiveness reviews
  • Compliance reporting and documentation

Install with Tessl CLI

npx tessl i tessl/pypi-aissemble-foundation-core-python

docs

auth.md

bom.md

config.md

filestore.md

index.md

inference.md

metadata.md

policy.md

tile.json