or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

auth.mdbom.mdconfig.mdfilestore.mdindex.mdinference.mdmetadata.mdpolicy.md
tile.json

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/aissemble-foundation-core-python@1.12.x

To install, run

npx @tessl/cli install tessl/pypi-aissemble-foundation-core-python@1.12.0

index.mddocs/

aiSSEMBLE Foundation Core Python

A comprehensive Python foundation package for the aiSSEMBLE platform that provides essential building blocks for machine learning, data engineering, and enterprise-grade applications. This package offers unified APIs for configuration management, cloud storage, metadata tracking, authentication, ML inference, and policy-based governance across distributed systems.

Package Information

  • Package Name: aissemble-foundation-core-python
  • Language: Python
  • Installation: Available through aiSSEMBLE distribution
  • Dependencies: Pydantic, Krausening, LibCloud, Kafka-Python, PyJWT, PyJKS, AIOHttp

Core Imports

# Bill of Materials for ML training
from aissemble_core_bom.training_bom import TrainingBOM

# Configuration classes for Spark and databases  
from aissemble_core_config import SparkRDBMSConfig, SparkElasticsearchConfig, SparkNeo4jConfig, MessagingConfig

# Cloud storage abstractions
from aissemble_core_filestore.file_store_factory import FileStoreFactory

# Metadata management API
from aissemble_core_metadata.metadata_api import MetadataAPI
from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
from aissemble_core_metadata.logging_metadata_api_service import LoggingMetadataAPIService

# Authentication and JWT utilities
from aissembleauth.auth_config import AuthConfig
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException

# ML inference client framework
from inference.inference_client import InferenceClient
from inference.inference_config import InferenceConfig
from inference.inference_request import InferenceRequest, InferenceRequestBatch
from inference.inference_result import InferenceResult, InferenceResultBatch
from inference.rest_inference_client import RestInferenceClient

# Policy-based configuration management
from policy_manager import AbstractPolicyManager, DefaultPolicyManager

Basic Usage

# Configure Spark for database connections
from aissemble_core_config import SparkRDBMSConfig

# Initialize database configuration
db_config = SparkRDBMSConfig()
jdbc_url = db_config.jdbc_url()  # Gets JDBC URL from properties
driver = db_config.jdbc_driver()  # Gets driver class name

# Create cloud file store
from aissemble_core_filestore.file_store_factory import FileStoreFactory

file_store = FileStoreFactory.create_file_store("my-s3-store")
# Now use libcloud StorageDriver interface for file operations

# Track metadata for ML workflows
from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService

metadata = MetadataModel(
    resource="training-dataset-v1.0", 
    subject="ml-pipeline",
    action="TRAINING_STARTED"
)

metadata_service = HiveMetadataAPIService()
metadata_service.create_metadata(metadata)

# Authenticate and validate JWT tokens
from aissembleauth.json_web_token_util import JsonWebTokenUtil

jwt_util = JsonWebTokenUtil()
try:
    parsed_token = jwt_util.parse_token(token_string)
    jwt_util.validate_token(token_string)  # Raises exception if invalid
    print("Token is valid")
except AissembleSecurityException as e:
    print(f"Authentication failed: {e}")

# Perform ML inference
from inference.rest_inference_client import RestInferenceClient
from inference.inference_request import InferenceRequest

client = RestInferenceClient()
request = InferenceRequest(
    source_ip_address="192.168.1.100",
    kind="security-scan",
    category="network-traffic"
)

result = await client.infer(request)
if result.threat_detected:
    print(f"Threat detected with score: {result.score}")

Architecture

The aiSSEMBLE Foundation Core provides 7 major functional areas that work together to support enterprise ML and data engineering workflows:

Configuration Layer

Unified configuration management for distributed systems including Spark clusters, databases (PostgreSQL, Elasticsearch, Neo4j), and messaging systems (Kafka). Supports property-based configuration with environment overrides.

Storage Abstraction

Cloud-agnostic file storage through LibCloud integration, supporting local filesystem, AWS S3, and other cloud providers with a consistent API interface.

Metadata Management

Comprehensive metadata tracking for ML workflows, data lineage, and audit trails. Supports multiple backends including Kafka-based streaming and logging-based implementations.

Security Framework

Enterprise-grade authentication and authorization using JWT tokens, Java keystore integration, and policy-based access control with pluggable security providers.

ML Inference Platform

Standardized client framework for ML model inference supporting both REST and gRPC protocols, with batch processing capabilities for high-throughput scenarios.

Policy Engine

Policy-based configuration and governance system supporting JSON-defined rules, targets, and alerts with extensible rule evaluation framework.

Training Metadata

Bill of Materials tracking for ML training workflows including dataset information, feature engineering details, model specifications, and MLflow integration.

Capabilities

Training BOM Management

Complete lifecycle tracking for machine learning training processes with structured metadata capture including dataset origins, feature engineering details, model architecture, and MLflow integration for experiment tracking.

class TrainingBOM(BaseModel):
    id: str
    start_time: str
    end_time: str
    dataset_info: DatasetInfo
    feature_info: FeatureInfo  
    model_info: ModelInfo
    mlflow_params: Dict
    mlflow_metrics: Dict

class TrainingBOM.DatasetInfo(BaseModel):
    origin: str
    size: int = 0

class TrainingBOM.FeatureInfo(BaseModel):
    original_features: List[str] = []
    selected_features: List[str] = []

class TrainingBOM.ModelInfo(BaseModel):
    type: str
    architecture: str

Training BOM

Spark and Database Configuration

Comprehensive configuration management for Spark clusters and database connections including PostgreSQL, Elasticsearch, and Neo4j with property-based settings and environment variable overrides.

class SparkRDBMSConfig:
    def __init__(self) -> None: ...
    def jdbc_url(self) -> str: ...
    def jdbc_driver(self) -> str: ...
    def user(self) -> str: ...
    def password(self) -> str: ...

class SparkElasticsearchConfig:
    def __init__(self) -> None: ...
    def spark_es_nodes(self) -> str: ...
    def spark_es_port(self) -> str: ...
    def get_es_configs(self) -> dict: ...

class SparkNeo4jConfig:
    def __init__(self) -> None: ...
    def url(self) -> str: ...
    def get_spark_options(self) -> Dict[str, str]: ...

class MessagingConfig:
    def __init__(self) -> None: ...
    def server(self) -> str: ...
    def metadata_topic(self) -> str: ...

Configuration Management

Cloud File Storage

Cloud-agnostic file storage abstraction using LibCloud to provide consistent API across local filesystem, AWS S3, and other cloud storage providers with automatic provider detection and configuration.

class FileStoreFactory:
    @staticmethod
    def create_file_store(name: str) -> StorageDriver: ...
    
    @staticmethod
    def create_local_file_store(name: str, filtered, cls) -> StorageDriver: ...
    
    @staticmethod
    def create_s3_file_store(name: str, filtered, provider) -> StorageDriver: ...

File Storage

Metadata API and Services

Comprehensive metadata management system for tracking data lineage, ML workflows, and audit trails with support for multiple storage backends including Kafka streaming and logging-based implementations.

class MetadataAPI(ABC):
    @abstractmethod
    def create_metadata(self, metadata: MetadataModel) -> None: ...
    
    @abstractmethod
    def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]: ...

class MetadataModel(BaseModel):
    resource: str = uuid4().hex
    subject: str = ""
    action: str = ""
    timestamp: float = datetime.now().timestamp()
    additionalValues: Dict[str, str] = dict()

class HiveMetadataAPIService(MetadataAPI):
    def __init__(self) -> None: ...
    def create_metadata(self, metadata: MetadataModel) -> None: ...
    def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]: ...

Metadata Management

Authentication and JWT Handling

Enterprise-grade security framework with JWT token management, Java keystore integration, and configurable authentication providers supporting both token validation and generation capabilities.

class AuthConfig:
    def __init__(self) -> None: ...
    def public_key_path(self) -> str: ...
    def jks_path(self) -> str: ...
    def jks_password(self) -> str: ...
    def jks_key_alias(self) -> str: ...
    def pdp_host_url(self) -> str: ...
    def is_authorization_enabled(self) -> bool: ...

class JsonWebTokenUtil:
    def __init__(self) -> None: ...
    def parse_token(self, token: str): ...
    def create_token(self): ...
    def validate_token(self, token: str) -> None: ...
    def get_sign_key(self) -> str: ...

class AissembleSecurityException(Exception): ...

Authentication

ML Inference Client

Standardized client framework for machine learning model inference supporting both individual and batch processing with REST and gRPC protocol support for high-performance model serving.

class InferenceClient(ABC):
    def __init__(self) -> None: ...
    
    @abstractmethod
    def infer(self, inference_request: InferenceRequest) -> InferenceResult: ...
    
    @abstractmethod
    def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> list[InferenceResultBatch]: ...

class InferenceConfig:
    def __init__(self) -> None: ...
    def rest_service_url(self) -> str: ...
    def rest_service_port(self) -> str: ...
    def grpc_service_url(self) -> str: ...
    def grpc_service_port(self) -> str: ...

class InferenceRequest:
    def __init__(self, source_ip_address: str = "", created: int = 0, kind: str = "", category: str = "", outcome: str = "") -> None: ...

class InferenceRequestBatch:
    def __init__(self, row_id_key: str, data: list[InferenceRequest]) -> None: ...
    
class InferenceResult:
    def __init__(self, threat_detected: bool = False, score: int = 0) -> None: ...

class InferenceResultBatch:
    def __init__(self, row_id_key: str, result: InferenceResult) -> None: ...

class RestInferenceClient(InferenceClient):
    async def infer(self, inference_request: InferenceRequest) -> InferenceResult: ...
    async def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> list[InferenceResultBatch]: ...

ML Inference

Policy Management

Policy-based configuration and governance system with JSON-defined rules, configurable targets, and extensible rule evaluation framework supporting complex business logic and compliance requirements.

class AbstractPolicyManager(ABC):
    def __init__(self) -> None: ...
    def getPolicy(self, policyIdentifier: str) -> Policy: ...
    def loadPolicyConfigurations(self, policiesLocation: str) -> None: ...
    
    @property
    def policies(self) -> Dict[str, Policy]: ...

class DefaultPolicyManager(AbstractPolicyManager):
    @staticmethod
    def getInstance() -> DefaultPolicyManager: ...

class AlertOptions:
    ALWAYS: str = "ALWAYS"
    ON_DETECTION: str = "ON_DETECTION"
    NEVER: str = "NEVER"

class Target(BaseModel):
    retrieve_url: Optional[str] = None
    type: Optional[str] = None

class ConfiguredTarget(Target):
    target_configurations: Dict[str, Any]

class ConfiguredRule(BaseModel):
    className: str
    configurations: Optional[Dict[str, Any]] = None
    configuredTargets: Optional[List[ConfiguredTarget]] = []

class Policy(BaseModel):
    alertOptions: AlertOptions = AlertOptions.ON_DETECTION
    identifier: str
    description: Optional[str] = None
    targets: Optional[List[Target]] = []
    rules: List[ConfiguredRule] = []

Policy Management