Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.
—
Policy-based configuration and governance system with JSON-defined rules, configurable targets, and extensible rule evaluation framework. The policy management system supports complex business logic and compliance requirements with flexible alert configurations and comprehensive policy lifecycle management.
Core policy management framework providing JSON-based configuration loading, policy validation, and rule configuration with extensible architecture for custom policy implementations.
class AbstractPolicyManager(ABC):
"""
Abstract class containing logic for reading and parsing policies and rules from JSON configuration.
Class Attributes:
- __logger - LogManager instance for AbstractPolicyManager
- __policyLocation = "POLICY_LOCATION" - Environment variable name for policy location
Properties:
- policies: Dict[str, Policy] - Returns policies dictionary
"""
def __init__(self) -> None:
"""Initialize policy manager and load configurations"""
...
def getPoliciesLocation(self) -> str:
"""Get policies location from config or environment"""
...
def loadPolicyConfigurations(self, policiesLocation: str) -> None:
"""Load policy configurations from JSON files"""
...
def loadJsonFilesInDirectory(self, policyConfigurationsLocation: str) -> None:
"""Load all JSON files in directory"""
...
def loadJsonFile(self, filePath: str, fileName: str) -> None:
"""Load individual JSON file"""
...
def validateAndAddPolicies(self, policies: List[PolicyInput]) -> None:
"""Validate and add list of policies"""
...
def validateAndAddPolicy(self, policyInput: PolicyInput) -> None:
"""Validate and add single policy"""
...
def validateAndConfigureRule(self, ruleInput: PolicyRuleInput, targets: List[Target]) -> ConfiguredRule:
"""Validate and configure policy rule"""
...
def getPolicy(self, policyIdentifier: str) -> Policy:
"""Get policy by identifier"""
...
def getDeserializationClass(self) -> any:
"""Get class for JSON deserialization (can be overridden)"""
...
def createPolicy(self, policyIdentifier: str) -> Policy:
"""Create policy instance (can be overridden)"""
...
def setAdditionalConfigurations(self, policy: Policy, input: PolicyInput) -> None:
"""Set additional configurations (can be overridden)"""
...Singleton implementation of the policy management framework providing centralized policy access and configuration management across application components.
class DefaultPolicyManager(AbstractPolicyManager):
"""
Default singleton policy manager implementation.
Class Attributes:
- __instance = None - Singleton instance
"""
def __init__(self) -> None:
"""Initialize singleton instance"""
...
@staticmethod
def getInstance() -> DefaultPolicyManager:
"""Get singleton instance"""
...Configuration management for policy system settings including policy location and default values with property-based configuration support.
class PolicyConfiguration:
"""
Used to configure policy location and defaults.
"""
def __init__(self) -> None:
"""Initialize with policy-configuration.properties"""
...
def policiesLocation(self) -> str:
"""Returns policies location from configuration"""
...Comprehensive data models for policy definition including alert options, targets, rules, and extensible configuration support with deprecated field handling for backward compatibility.
class AlertOptions(Enum):
"""
Enum to determine when alerts will be sent for policies.
Values:
- ALWAYS = "ALWAYS"
- ON_DETECTION = "ON_DETECTION"
- NEVER = "NEVER"
"""
ALWAYS = "ALWAYS"
ON_DETECTION = "ON_DETECTION"
NEVER = "NEVER"
class Target(BaseModel):
"""
Contains target information for the policy.
Attributes:
- retrieve_url: Optional[str] = None - Target retrieval URL
- type: Optional[str] = None - Target type
"""
retrieve_url: Optional[str] = None
type: Optional[str] = None
class ConfiguredTarget(Target):
"""
Contains target information with configurations needed by the rule.
Attributes:
- target_configurations: Dict[str, Any] - Target configurations dictionary
- Inherits: retrieve_url and type from Target
"""
target_configurations: Dict[str, Any]
class ConfiguredRule(BaseModel):
"""
Represents a rule read by the policy manager with class and configuration information.
Class Attributes:
- __logger - LogManager instance for ConfiguredRule
- __deprecated_set_methods - Dictionary for deprecated method mapping
Attributes:
- className: str - Rule class name
- configurations: Optional[Dict[str, Any]] = None - Rule configurations
- configuredTargets: Optional[List[ConfiguredTarget]] = [] - List of configured targets
Properties:
- targetConfigurations: ConfiguredTarget - Deprecated property (use configuredTargets)
"""
className: str
configurations: Optional[Dict[str, Any]] = None
configuredTargets: Optional[List[ConfiguredTarget]] = []
def set_deprecated_targetConfigurations(self, new_value: ConfiguredTarget):
"""Deprecated setter method"""
...
def __setattr__(self, key, val):
"""Custom attribute setter for deprecated attributes"""
...
class Policy(BaseModel):
"""
Maps a rule or set of rules with identifier for policy execution.
Class Attributes:
- __logger - LogManager instance for Policy
- __deprecated_set_methods - Dictionary for deprecated method mapping
Attributes:
- alertOptions: AlertOptions = AlertOptions.ON_DETECTION - Alert options (default: ON_DETECTION)
- identifier: str - Policy identifier
- description: Optional[str] = None - Policy description
- targets: Optional[List[Target]] = [] - List of targets
- rules: List[ConfiguredRule] = [] - List of configured rules
Properties:
- target: Target - Deprecated property (use targets)
"""
alertOptions: AlertOptions = AlertOptions.ON_DETECTION
identifier: str
description: Optional[str] = None
targets: Optional[List[Target]] = []
rules: List[ConfiguredRule] = []
def set_deprecated_target(self, new_value: Target):
"""Deprecated setter method"""
...
def __setattr__(self, key, val):
"""Custom attribute setter for deprecated attributes"""
...JSON deserialization models for loading policy configurations from files with support for both current and deprecated field structures for backward compatibility.
class PolicyRuleInput(BaseModel):
"""
Represents policy rule data read from JSON file.
Attributes:
- className: str - Class name for the rule (required)
- configurations: Optional[Dict[str, Any]] = None - Rule configuration
- targetConfigurations: Optional[Dict[str, Any]] = None - Target configurations
"""
className: str
configurations: Optional[Dict[str, Any]] = None
targetConfigurations: Optional[Dict[str, Any]] = None
class PolicyInput(BaseModel):
"""
Represents policy information read from JSON file.
Class Attributes:
- __logger - LogManager instance for PolicyInput
Attributes:
- identifier: str - Policy identifier (required)
- description: Optional[str] = None - Policy description
- targets: Optional[List[Target]] = None - List of targets
- shouldSendAlert: Optional[AlertOptions] = None - Alert configuration
- rules: List[PolicyRuleInput] = [] - List of policy rules
- target: Optional[Target] = None - Deprecated single target
"""
identifier: str
description: Optional[str] = None
targets: Optional[List[Target]] = None
shouldSendAlert: Optional[AlertOptions] = None
rules: List[PolicyRuleInput] = []
target: Optional[Target] = None
def getAnyTargets(self) -> List[Target]:
"""Returns targets list, checking both new and deprecated attributes"""
...Result tracking for policy execution with comprehensive metadata including policy identification, description, and execution timestamps for audit and monitoring purposes.
class PolicyInvocationResult(BaseModel):
"""
Represents the results of a policy invocation.
Attributes:
- policyName: str - Name of the policy
- policyDescription: Optional[str] = None - Policy description
- timestamp: str - Invocation timestamp
"""
policyName: str
policyDescription: Optional[str] = None
timestamp: strfrom policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, AlertOptions
import json
import os
# Initialize policy manager (singleton)
policy_manager = DefaultPolicyManager.getInstance()
# Get available policies
available_policies = policy_manager.policies
print(f"Loaded {len(available_policies)} policies:")
for policy_id, policy in available_policies.items():
print(f" - {policy_id}: {policy.description}")
# Get specific policy
try:
security_policy = policy_manager.getPolicy("data_security_policy")
print(f"Security policy loaded: {security_policy.identifier}")
print(f"Rules count: {len(security_policy.rules)}")
print(f"Alert setting: {security_policy.alertOptions}")
except Exception as e:
print(f"Policy not found: {e}")from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions
from policy_manager.policy.json.policy_input import PolicyInput, PolicyRuleInput
import json
import tempfile
import os
def create_sample_policy_configuration():
"""Create sample JSON policy configuration"""
# Define policy configuration
policy_config = {
"identifier": "data_quality_policy",
"description": "Ensures data quality standards across all datasets",
"shouldSendAlert": "ON_DETECTION",
"targets": [
{
"retrieve_url": "s3://data-lake/raw-data/",
"type": "data_source"
},
{
"retrieve_url": "s3://data-lake/processed-data/",
"type": "processed_data"
}
],
"rules": [
{
"className": "com.example.rules.CompletenessRule",
"configurations": {
"minimum_completeness": 0.95,
"required_fields": ["id", "timestamp", "value"]
},
"targetConfigurations": {
"batch_size": 10000,
"sampling_rate": 0.1
}
},
{
"className": "com.example.rules.AccuracyRule",
"configurations": {
"accuracy_threshold": 0.98,
"validation_method": "statistical"
}
}
]
}
# Create temporary policy file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(policy_config, f, indent=2)
return f.name
def load_custom_policies():
"""Load custom policies from JSON configuration"""
# Create sample configuration
policy_file = create_sample_policy_configuration()
try:
# Get policy manager
manager = DefaultPolicyManager.getInstance()
# Load the custom policy file
manager.loadJsonFile(os.path.dirname(policy_file), os.path.basename(policy_file))
# Retrieve the loaded policy
policy = manager.getPolicy("data_quality_policy")
print(f"Loaded policy: {policy.identifier}")
print(f"Description: {policy.description}")
print(f"Alert options: {policy.alertOptions}")
print(f"Number of targets: {len(policy.targets)}")
print(f"Number of rules: {len(policy.rules)}")
# Print rule details
for i, rule in enumerate(policy.rules):
print(f"\nRule {i+1}:")
print(f" Class: {rule.className}")
print(f" Configurations: {rule.configurations}")
if rule.configuredTargets:
print(f" Target configurations: {len(rule.configuredTargets)}")
return policy
finally:
# Clean up temporary file
if os.path.exists(policy_file):
os.unlink(policy_file)
# Usage example
custom_policy = load_custom_policies()from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, ConfiguredRule
from policy_manager.policy.result.policy_invocation_result import PolicyInvocationResult
from datetime import datetime
from typing import Dict, List, Any
import logging
class PolicyExecutionEngine:
"""Engine for executing policies and tracking results"""
def __init__(self):
self.policy_manager = DefaultPolicyManager.getInstance()
self.execution_results = []
self.logger = logging.getLogger(__name__)
def execute_policy(self, policy_identifier: str, target_data: Any, context: Dict[str, Any] = None) -> PolicyInvocationResult:
"""Execute a specific policy against target data"""
try:
# Get policy configuration
policy = self.policy_manager.getPolicy(policy_identifier)
# Create invocation result
result = PolicyInvocationResult(
policyName=policy.identifier,
policyDescription=policy.description,
timestamp=datetime.now().isoformat()
)
print(f"Executing policy: {policy.identifier}")
# Execute each rule in the policy
rule_results = []
for rule in policy.rules:
rule_result = self._execute_rule(rule, target_data, policy.targets, context)
rule_results.append(rule_result)
print(f" Rule {rule.className}: {'PASSED' if rule_result['passed'] else 'FAILED'}")
if not rule_result['passed']:
print(f" Reason: {rule_result.get('reason', 'Unknown')}")
# Determine overall policy result
policy_passed = all(result['passed'] for result in rule_results)
# Handle alerting based on policy configuration
if not policy_passed and policy.alertOptions != AlertOptions.NEVER:
self._send_alert(policy, rule_results, context)
elif policy.alertOptions == AlertOptions.ALWAYS:
self._send_alert(policy, rule_results, context)
# Store result
result.rule_results = rule_results # Add rule results to invocation result
result.overall_status = "PASSED" if policy_passed else "FAILED"
self.execution_results.append(result)
return result
except Exception as e:
self.logger.error(f"Policy execution failed: {e}")
raise
def _execute_rule(self, rule: ConfiguredRule, target_data: Any, targets: List, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute individual rule (this would integrate with actual rule implementations)"""
# This is a simplified rule execution simulation
# In practice, this would dynamically load and execute the rule class
rule_result = {
'rule_class': rule.className,
'passed': True,
'details': {},
'execution_time': datetime.now().isoformat()
}
# Simulate rule-specific logic based on class name
if "CompletenessRule" in rule.className:
rule_result.update(self._execute_completeness_rule(rule, target_data))
elif "AccuracyRule" in rule.className:
rule_result.update(self._execute_accuracy_rule(rule, target_data))
elif "DataFreshnessRule" in rule.className:
rule_result.update(self._execute_freshness_rule(rule, target_data))
else:
# Generic rule execution
rule_result['details']['message'] = f"Executed {rule.className} successfully"
return rule_result
def _execute_completeness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
"""Simulate completeness rule execution"""
config = rule.configurations or {}
min_completeness = config.get('minimum_completeness', 0.95)
# Simulate completeness check (in practice, this would analyze actual data)
simulated_completeness = 0.97 # Assume 97% completeness
passed = simulated_completeness >= min_completeness
return {
'passed': passed,
'details': {
'completeness_score': simulated_completeness,
'threshold': min_completeness,
'message': f"Data completeness: {simulated_completeness:.2%}"
},
'reason': None if passed else f"Completeness {simulated_completeness:.2%} below threshold {min_completeness:.2%}"
}
def _execute_accuracy_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
"""Simulate accuracy rule execution"""
config = rule.configurations or {}
accuracy_threshold = config.get('accuracy_threshold', 0.98)
# Simulate accuracy check
simulated_accuracy = 0.96 # Assume 96% accuracy
passed = simulated_accuracy >= accuracy_threshold
return {
'passed': passed,
'details': {
'accuracy_score': simulated_accuracy,
'threshold': accuracy_threshold,
'validation_method': config.get('validation_method', 'unknown')
},
'reason': None if passed else f"Accuracy {simulated_accuracy:.2%} below threshold {accuracy_threshold:.2%}"
}
def _execute_freshness_rule(self, rule: ConfiguredRule, data: Any) -> Dict[str, Any]:
"""Simulate data freshness rule execution"""
config = rule.configurations or {}
max_age_hours = config.get('max_age_hours', 24)
# Simulate freshness check
simulated_age_hours = 30 # Assume data is 30 hours old
passed = simulated_age_hours <= max_age_hours
return {
'passed': passed,
'details': {
'data_age_hours': simulated_age_hours,
'max_allowed_hours': max_age_hours
},
'reason': None if passed else f"Data age {simulated_age_hours}h exceeds limit {max_age_hours}h"
}
def _send_alert(self, policy: Policy, rule_results: List[Dict], context: Dict[str, Any]):
"""Send policy violation alert"""
failed_rules = [r for r in rule_results if not r['passed']]
alert_message = f"Policy Violation: {policy.identifier}\n"
alert_message += f"Description: {policy.description}\n"
alert_message += f"Failed Rules: {len(failed_rules)}/{len(rule_results)}\n\n"
for failed_rule in failed_rules:
alert_message += f"- {failed_rule['rule_class']}: {failed_rule['reason']}\n"
print(f"🚨 POLICY ALERT 🚨")
print(alert_message)
# In practice, this would integrate with alerting systems
# (email, Slack, PagerDuty, etc.)
def get_execution_summary(self) -> Dict[str, Any]:
"""Get summary of all policy executions"""
total_executions = len(self.execution_results)
passed_executions = sum(1 for result in self.execution_results if getattr(result, 'overall_status', 'FAILED') == 'PASSED')
return {
'total_executions': total_executions,
'passed_executions': passed_executions,
'failed_executions': total_executions - passed_executions,
'success_rate': (passed_executions / total_executions * 100) if total_executions > 0 else 0
}
# Usage example
execution_engine = PolicyExecutionEngine()
# Execute policies against sample data
sample_data = {
'dataset_id': 'customer_transactions_2024',
'record_count': 1500000,
'last_updated': '2024-09-05T10:30:00Z'
}
# Execute different policies
try:
# Data quality policy
quality_result = execution_engine.execute_policy(
policy_identifier='data_quality_policy',
target_data=sample_data,
context={'environment': 'production', 'pipeline_id': 'etl_001'}
)
print(f"Quality policy result: {getattr(quality_result, 'overall_status', 'Unknown')}")
# Get execution summary
summary = execution_engine.get_execution_summary()
print(f"\nExecution Summary:")
print(f" Total executions: {summary['total_executions']}")
print(f" Success rate: {summary['success_rate']:.1f}%")
except Exception as e:
print(f"Policy execution failed: {e}")from policy_manager import DefaultPolicyManager
from policy_manager.policy.policy import Policy, ConfiguredRule, Target, AlertOptions
from policy_manager.configuration.policy_configuration import PolicyConfiguration
import json
import os
from typing import Dict, List
from datetime import datetime
class DynamicPolicyManager:
"""Advanced policy management with dynamic configuration updates"""
def __init__(self):
self.base_manager = DefaultPolicyManager.getInstance()
self.config = PolicyConfiguration()
self.policy_cache = {}
self.change_listeners = []
def register_change_listener(self, callback):
"""Register callback for policy configuration changes"""
self.change_listeners.append(callback)
def notify_change_listeners(self, policy_id: str, change_type: str):
"""Notify registered listeners of policy changes"""
for callback in self.change_listeners:
try:
callback(policy_id, change_type)
except Exception as e:
print(f"Error notifying change listener: {e}")
def create_policy_runtime(self, policy_config: Dict) -> Policy:
"""Create policy dynamically from configuration dictionary"""
# Create targets
targets = []
if 'targets' in policy_config:
for target_config in policy_config['targets']:
target = Target(
retrieve_url=target_config.get('retrieve_url'),
type=target_config.get('type')
)
targets.append(target)
# Create rules
rules = []
if 'rules' in policy_config:
for rule_config in policy_config['rules']:
rule = ConfiguredRule(
className=rule_config['className'],
configurations=rule_config.get('configurations'),
configuredTargets=[] # Would be populated based on targetConfigurations
)
rules.append(rule)
# Create policy
policy = Policy(
identifier=policy_config['identifier'],
description=policy_config.get('description'),
alertOptions=AlertOptions(policy_config.get('shouldSendAlert', 'ON_DETECTION')),
targets=targets,
rules=rules
)
return policy
def update_policy_configuration(self, policy_id: str, updates: Dict):
"""Update existing policy configuration"""
try:
# Get existing policy
existing_policy = self.base_manager.getPolicy(policy_id)
# Create updated configuration
updated_config = {
'identifier': existing_policy.identifier,
'description': existing_policy.description,
'shouldSendAlert': existing_policy.alertOptions.value,
'targets': [
{'retrieve_url': t.retrieve_url, 'type': t.type}
for t in existing_policy.targets
] if existing_policy.targets else [],
'rules': [
{
'className': r.className,
'configurations': r.configurations,
'targetConfigurations': {} # Simplified
}
for r in existing_policy.rules
]
}
# Apply updates
updated_config.update(updates)
# Create new policy
new_policy = self.create_policy_runtime(updated_config)
# Update in manager (simplified - would need proper integration)
self.policy_cache[policy_id] = new_policy
# Notify listeners
self.notify_change_listeners(policy_id, 'UPDATED')
print(f"Policy {policy_id} updated successfully")
return new_policy
except Exception as e:
print(f"Failed to update policy {policy_id}: {e}")
raise
def create_conditional_policy(self, base_policy_id: str, conditions: Dict) -> str:
"""Create conditional policy based on existing policy"""
base_policy = self.base_manager.getPolicy(base_policy_id)
# Generate conditional policy ID
condition_hash = hash(json.dumps(conditions, sort_keys=True))
conditional_id = f"{base_policy_id}_conditional_{abs(condition_hash)}"
# Create conditional rules
conditional_rules = []
for rule in base_policy.rules:
# Add conditions to rule configuration
enhanced_config = rule.configurations.copy() if rule.configurations else {}
enhanced_config['conditions'] = conditions
conditional_rule = ConfiguredRule(
className=rule.className,
configurations=enhanced_config,
configuredTargets=rule.configuredTargets
)
conditional_rules.append(conditional_rule)
# Create conditional policy
conditional_policy = Policy(
identifier=conditional_id,
description=f"Conditional variant of {base_policy.description}",
alertOptions=base_policy.alertOptions,
targets=base_policy.targets,
rules=conditional_rules
)
# Store in cache
self.policy_cache[conditional_id] = conditional_policy
# Notify listeners
self.notify_change_listeners(conditional_id, 'CREATED')
return conditional_id
def get_policy_with_fallback(self, policy_id: str) -> Policy:
"""Get policy with cache fallback"""
# Try cache first
if policy_id in self.policy_cache:
return self.policy_cache[policy_id]
# Fallback to base manager
try:
return self.base_manager.getPolicy(policy_id)
except Exception:
# Create default policy if not found
return self._create_default_policy(policy_id)
def _create_default_policy(self, policy_id: str) -> Policy:
"""Create default policy for unknown identifiers"""
default_policy = Policy(
identifier=policy_id,
description=f"Default policy for {policy_id}",
alertOptions=AlertOptions.NEVER,
targets=[],
rules=[]
)
self.policy_cache[policy_id] = default_policy
return default_policy
def export_policies_to_json(self, policy_ids: List[str] = None) -> str:
"""Export policies to JSON configuration"""
policies_to_export = policy_ids or list(self.base_manager.policies.keys())
export_data = []
for policy_id in policies_to_export:
try:
policy = self.get_policy_with_fallback(policy_id)
policy_data = {
'identifier': policy.identifier,
'description': policy.description,
'shouldSendAlert': policy.alertOptions.value,
'targets': [
{
'retrieve_url': target.retrieve_url,
'type': target.type
}
for target in (policy.targets or [])
],
'rules': [
{
'className': rule.className,
'configurations': rule.configurations or {},
'targetConfigurations': {} # Simplified
}
for rule in policy.rules
]
}
export_data.append(policy_data)
except Exception as e:
print(f"Error exporting policy {policy_id}: {e}")
return json.dumps(export_data, indent=2)
# Usage example
def policy_management_demo():
"""Demonstrate dynamic policy management capabilities"""
dynamic_manager = DynamicPolicyManager()
# Register change listener
def policy_change_handler(policy_id: str, change_type: str):
print(f"📋 Policy Change: {policy_id} - {change_type}")
dynamic_manager.register_change_listener(policy_change_handler)
# Create new policy at runtime
new_policy_config = {
'identifier': 'dynamic_security_policy',
'description': 'Runtime created security policy',
'shouldSendAlert': 'ALWAYS',
'targets': [
{'retrieve_url': 'https://api.security.com/scan', 'type': 'security_endpoint'}
],
'rules': [
{
'className': 'com.example.rules.ThreatDetectionRule',
'configurations': {
'sensitivity': 'high',
'scan_types': ['malware', 'intrusion', 'anomaly']
}
}
]
}
new_policy = dynamic_manager.create_policy_runtime(new_policy_config)
print(f"Created policy: {new_policy.identifier}")
# Update existing policy
updates = {
'description': 'Updated security policy with enhanced monitoring',
'rules': [
{
'className': 'com.example.rules.EnhancedThreatDetectionRule',
'configurations': {
'sensitivity': 'maximum',
'scan_types': ['malware', 'intrusion', 'anomaly', 'behavioral'],
'real_time_monitoring': True
}
}
]
}
try:
updated_policy = dynamic_manager.update_policy_configuration('dynamic_security_policy', updates)
print(f"Updated policy: {updated_policy.identifier}")
except Exception as e:
print(f"Update failed: {e}")
# Create conditional policy
conditions = {
'environment': 'production',
'data_classification': 'sensitive',
'time_window': '09:00-17:00'
}
conditional_id = dynamic_manager.create_conditional_policy('dynamic_security_policy', conditions)
print(f"Created conditional policy: {conditional_id}")
# Export policies
exported_config = dynamic_manager.export_policies_to_json(['dynamic_security_policy', conditional_id])
print("Exported policy configuration:")
print(exported_config[:200] + "..." if len(exported_config) > 200 else exported_config)
# Run the demonstration
policy_management_demo()Install with Tessl CLI
npx tessl i tessl/pypi-aissemble-foundation-core-python