CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Pending
Overview
Eval results
Files

metadata.mddocs/

Metadata Management

Comprehensive metadata management system for tracking data lineage, ML workflows, and audit trails with support for multiple storage backends. The metadata framework provides standardized interfaces for capturing, storing, and querying metadata across distributed data processing pipelines with Kafka streaming and logging-based implementations.

Capabilities

Metadata API Interface

Abstract base interface defining standardized metadata operations for creating and retrieving metadata across different storage backends and implementation strategies.

class MetadataAPI(ABC):
    """
    API for a metadata service.
    """
    
    @abstractmethod
    def create_metadata(self, metadata: MetadataModel) -> None:
        """
        Method to create metadata.
        
        Parameters:
        - metadata: MetadataModel - Metadata object to create
        """
        ...
    
    @abstractmethod
    def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:
        """
        Method to get metadata from search criteria.
        
        Parameters:
        - search_params: Dict[str, any] - Search parameters for metadata query
        
        Returns:
        List[MetadataModel] - List of matching metadata objects
        """
        ...

Metadata Data Model

Standardized data model for capturing comprehensive metadata information including resource identifiers, subjects, actions, timestamps, and extensible key-value pairs for additional context.

class MetadataModel(BaseModel):
    """
    Class that represents a common metadata model.
    
    Attributes:
    - resource: str - The identifier of the data (default: random UUID)
    - subject: str - The thing acting on the data (default: empty string)
    - action: str - The action being taken (default: empty string)
    - timestamp: float - Time when action occurred as Unix timestamp (default: current timestamp)
    - additionalValues: Dict[str, str] - Additional key-value pairs (default: empty dict)
    """
    resource: str = uuid4().hex
    subject: str = ""
    action: str = ""
    timestamp: float = datetime.now().timestamp()
    additionalValues: Dict[str, str] = dict()

Hive Metadata Service

Production-ready metadata service implementation using Kafka for high-throughput metadata streaming with MessagingConfig integration for distributed data processing workflows.

class HiveMetadataAPIService(MetadataAPI):
    """
    Class to handle basic logging of metadata.
    
    Class Attributes:
    - logger - LogManager instance for HiveMetadataAPIService
    """
    
    def __init__(self) -> None:
        """Initialize with MessagingConfig and KafkaProducer"""
        ...
    
    def create_metadata(self, metadata: MetadataModel) -> None:
        """
        Creates metadata by sending to Kafka.
        
        Parameters:
        - metadata: MetadataModel - Metadata to send to Kafka topic
        """
        ...
    
    def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:
        """
        Returns empty list (not implemented).
        
        Parameters:
        - search_params: Dict[str, any] - Search parameters (unused)
        
        Returns:
        List[MetadataModel] - Empty list (retrieval not implemented)
        """
        ...

Logging Metadata Service

Development and testing metadata service implementation using logging output for metadata tracking, designed for non-production environments and debugging workflows.

class LoggingMetadataAPIService(MetadataAPI):
    """
    Class to handle basic logging of metadata. 
    Intended for testing purposes and not suited for production.
    
    Class Attributes:
    - logger - LogManager instance for LoggingMetadataAPIService
    """
    
    def create_metadata(self, metadata: MetadataModel) -> None:
        """
        Logs metadata information.
        
        Parameters:
        - metadata: MetadataModel - Metadata to log
        """
        ...
    
    def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:
        """
        Returns empty list (not implemented).
        
        Parameters:
        - search_params: Dict[str, any] - Search parameters (unused)
        
        Returns:
        List[MetadataModel] - Empty list (retrieval not implemented)
        """
        ...

Usage Examples

Basic Metadata Tracking

from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
from datetime import datetime

# Create metadata for data processing event
metadata = MetadataModel(
    resource="customer-transactions-2024-09",
    subject="etl-pipeline-001",
    action="DATA_PROCESSED",
    timestamp=datetime.now().timestamp(),
    additionalValues={
        "records_processed": "1500000",
        "processing_duration": "45_minutes",
        "data_quality_score": "0.95"
    }
)

# Initialize Hive metadata service (uses Kafka)
metadata_service = HiveMetadataAPIService()

# Send metadata to Kafka topic
metadata_service.create_metadata(metadata)
print(f"Metadata sent for resource: {metadata.resource}")

ML Training Metadata Tracking

from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
from datetime import datetime
import uuid

class MLTrainingMetadataTracker:
    """Track comprehensive metadata for ML training workflows"""
    
    def __init__(self):
        self.metadata_service = HiveMetadataAPIService()
        self.training_session_id = str(uuid.uuid4())
    
    def track_training_start(self, model_name: str, dataset_path: str):
        """Track training session initiation"""
        metadata = MetadataModel(
            resource=self.training_session_id,
            subject="ml-training-pipeline",
            action="TRAINING_STARTED",
            additionalValues={
                "model_name": model_name,
                "dataset_path": dataset_path,
                "training_environment": "production",
                "framework": "scikit-learn"
            }
        )
        self.metadata_service.create_metadata(metadata)
        print(f"Training started for {model_name}")
    
    def track_data_loading(self, records_count: int, features_count: int):
        """Track data loading phase"""
        metadata = MetadataModel(
            resource=self.training_session_id,
            subject="data-loader",
            action="DATA_LOADED",
            additionalValues={
                "records_count": str(records_count),
                "features_count": str(features_count),
                "data_validation": "passed",
                "missing_values_handled": "true"
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_feature_engineering(self, original_features: list, selected_features: list):
        """Track feature engineering process"""
        metadata = MetadataModel(
            resource=self.training_session_id,
            subject="feature-engineer",
            action="FEATURES_ENGINEERED",
            additionalValues={
                "original_feature_count": str(len(original_features)),
                "selected_feature_count": str(len(selected_features)),
                "feature_selection_method": "recursive_feature_elimination",
                "dimensionality_reduction": "applied"
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_model_training(self, algorithm: str, hyperparameters: dict):
        """Track model training process"""
        metadata = MetadataModel(
            resource=self.training_session_id,
            subject="model-trainer",
            action="MODEL_TRAINED",
            additionalValues={
                "algorithm": algorithm,
                **{f"param_{k}": str(v) for k, v in hyperparameters.items()},
                "cross_validation_folds": "5",
                "training_time": "25_minutes"
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_model_evaluation(self, metrics: dict):
        """Track model evaluation results"""
        metadata = MetadataModel(
            resource=self.training_session_id,
            subject="model-evaluator",
            action="MODEL_EVALUATED",
            additionalValues={
                **{f"metric_{k}": str(v) for k, v in metrics.items()},
                "evaluation_dataset": "holdout_test_set",
                "evaluation_method": "stratified_split"
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_training_completion(self, model_path: str, status: str):
        """Track training completion"""
        metadata = MetadataModel(
            resource=self.training_session_id,
            subject="ml-training-pipeline",
            action="TRAINING_COMPLETED",
            additionalValues={
                "final_status": status,
                "model_artifact_path": model_path,
                "total_training_time": "2_hours_15_minutes",
                "model_size_mb": "12.5"
            }
        )
        self.metadata_service.create_metadata(metadata)
        print(f"Training completed with status: {status}")

# Usage example
tracker = MLTrainingMetadataTracker()

# Track complete ML training workflow
tracker.track_training_start("fraud_detection_v2", "s3://data/fraud_training.parquet")
tracker.track_data_loading(records_count=1000000, features_count=47)
tracker.track_feature_engineering(
    original_features=["age", "income", "transaction_amount", "merchant", "location"],
    selected_features=["age", "income", "transaction_amount"]
)
tracker.track_model_training(
    algorithm="RandomForestClassifier",
    hyperparameters={"n_estimators": 100, "max_depth": 10, "min_samples_split": 2}
)
tracker.track_model_evaluation({
    "accuracy": 0.94,
    "precision": 0.91,
    "recall": 0.96,
    "f1_score": 0.93
})
tracker.track_training_completion("s3://models/fraud_detection_v2.pkl", "SUCCESS")

Data Pipeline Lineage Tracking

from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
from datetime import datetime
import json

class DataLineageTracker:
    """Track data lineage across processing pipeline stages"""
    
    def __init__(self, pipeline_id: str):
        self.pipeline_id = pipeline_id
        self.metadata_service = HiveMetadataAPIService()
        self.stage_counter = 0
    
    def track_data_ingestion(self, source_path: str, records_ingested: int):
        """Track data ingestion from external source"""
        self.stage_counter += 1
        
        metadata = MetadataModel(
            resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
            subject="data-ingestion",
            action="DATA_INGESTED",
            additionalValues={
                "pipeline_id": self.pipeline_id,
                "stage_number": str(self.stage_counter),
                "source_path": source_path,
                "records_ingested": str(records_ingested),
                "ingestion_method": "spark_batch",
                "data_format": "parquet"
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_data_transformation(self, transformation_type: str, input_records: int, output_records: int):
        """Track data transformation operations"""
        self.stage_counter += 1
        
        metadata = MetadataModel(
            resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
            subject="data-transformer",
            action="DATA_TRANSFORMED",
            additionalValues={
                "pipeline_id": self.pipeline_id,
                "stage_number": str(self.stage_counter),
                "transformation_type": transformation_type,
                "input_records": str(input_records),
                "output_records": str(output_records),
                "transformation_time": str(datetime.now().timestamp())
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_data_quality_check(self, quality_rules: dict, quality_score: float):
        """Track data quality validation"""
        self.stage_counter += 1
        
        metadata = MetadataModel(
            resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
            subject="quality-checker",
            action="QUALITY_VALIDATED",
            additionalValues={
                "pipeline_id": self.pipeline_id,
                "stage_number": str(self.stage_counter),
                "quality_score": str(quality_score),
                "quality_rules": json.dumps(quality_rules),
                "validation_passed": str(quality_score >= 0.8)
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_data_output(self, output_path: str, records_written: int):
        """Track data output to destination"""
        self.stage_counter += 1
        
        metadata = MetadataModel(
            resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
            subject="data-writer",
            action="DATA_WRITTEN",
            additionalValues={
                "pipeline_id": self.pipeline_id,
                "stage_number": str(self.stage_counter),
                "output_path": output_path,
                "records_written": str(records_written),
                "write_format": "delta",
                "partition_strategy": "date_based"
            }
        )
        self.metadata_service.create_metadata(metadata)
    
    def track_pipeline_completion(self, status: str, total_runtime: str):
        """Track overall pipeline completion"""
        metadata = MetadataModel(
            resource=self.pipeline_id,
            subject="pipeline-orchestrator",
            action="PIPELINE_COMPLETED",
            additionalValues={
                "final_status": status,
                "total_stages": str(self.stage_counter),
                "total_runtime": total_runtime,
                "completion_time": str(datetime.now().timestamp())
            }
        )
        self.metadata_service.create_metadata(metadata)

# Usage example
lineage_tracker = DataLineageTracker("customer_analytics_pipeline_20240905")

# Track complete data pipeline
lineage_tracker.track_data_ingestion("s3://raw-data/customers.parquet", 2500000)
lineage_tracker.track_data_transformation("deduplication", 2500000, 2450000)
lineage_tracker.track_data_transformation("enrichment", 2450000, 2450000)
lineage_tracker.track_data_quality_check(
    quality_rules={"completeness": 0.95, "validity": 0.98, "consistency": 0.92},
    quality_score=0.95
)
lineage_tracker.track_data_output("s3://processed-data/customer_analytics.delta", 2450000)
lineage_tracker.track_pipeline_completion("SUCCESS", "1_hour_23_minutes")

Development vs Production Metadata Services

from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
from aissemble_core_metadata.logging_metadata_api_service import LoggingMetadataAPIService
import os

class MetadataServiceFactory:
    """Factory for creating appropriate metadata service based on environment"""
    
    @staticmethod
    def create_metadata_service():
        """Create metadata service based on environment"""
        environment = os.getenv("ENVIRONMENT", "development")
        
        if environment == "production":
            print("Using Hive metadata service (Kafka)")
            return HiveMetadataAPIService()
        else:
            print("Using logging metadata service (Console)")
            return LoggingMetadataAPIService()

class UniversalMetadataTracker:
    """Metadata tracker that works across development and production"""
    
    def __init__(self):
        self.metadata_service = MetadataServiceFactory.create_metadata_service()
    
    def track_event(self, resource: str, subject: str, action: str, **additional_values):
        """Track any type of event with flexible additional values"""
        metadata = MetadataModel(
            resource=resource,
            subject=subject,
            action=action,
            additionalValues={str(k): str(v) for k, v in additional_values.items()}
        )
        
        self.metadata_service.create_metadata(metadata)
        return metadata
    
    def track_batch_events(self, events: list):
        """Track multiple events in sequence"""
        tracked_events = []
        
        for event in events:
            metadata = self.track_event(**event)
            tracked_events.append(metadata)
        
        return tracked_events

# Usage example
tracker = UniversalMetadataTracker()

# Single event tracking
tracker.track_event(
    resource="user_behavior_analysis",
    subject="analytics_engine",
    action="ANALYSIS_COMPLETED",
    user_count=150000,
    analysis_type="behavioral_segmentation",
    execution_time="45_minutes"
)

# Batch event tracking
batch_events = [
    {
        "resource": "daily_report_001",
        "subject": "report_generator",
        "action": "REPORT_STARTED",
        "report_type": "sales_summary"
    },
    {
        "resource": "daily_report_001", 
        "subject": "data_aggregator",
        "action": "DATA_AGGREGATED",
        "records_processed": 500000
    },
    {
        "resource": "daily_report_001",
        "subject": "report_generator", 
        "action": "REPORT_COMPLETED",
        "output_path": "s3://reports/daily_sales_20240905.pdf"
    }
]

tracked = tracker.track_batch_events(batch_events)
print(f"Tracked {len(tracked)} events")

Custom Metadata Extensions

from aissemble_core_metadata.metadata_model import MetadataModel
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
from datetime import datetime
import json

class EnhancedMetadataModel(MetadataModel):
    """Extended metadata model with domain-specific fields"""
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
    def add_ml_context(self, model_version: str, experiment_id: str, dataset_version: str):
        """Add ML-specific context"""
        self.additionalValues.update({
            "ml_model_version": model_version,
            "ml_experiment_id": experiment_id,
            "ml_dataset_version": dataset_version,
            "ml_context_added": str(datetime.now().timestamp())
        })
        return self
    
    def add_data_context(self, schema_version: str, partition_info: dict, quality_metrics: dict):
        """Add data processing context"""
        self.additionalValues.update({
            "data_schema_version": schema_version,
            "data_partition_info": json.dumps(partition_info),
            "data_quality_metrics": json.dumps(quality_metrics),
            "data_context_added": str(datetime.now().timestamp())
        })
        return self
    
    def add_infrastructure_context(self, cluster_id: str, node_count: int, resource_usage: dict):
        """Add infrastructure context"""
        self.additionalValues.update({
            "infra_cluster_id": cluster_id,
            "infra_node_count": str(node_count),
            "infra_resource_usage": json.dumps(resource_usage),
            "infra_context_added": str(datetime.now().timestamp())
        })
        return self

class DomainSpecificMetadataService:
    """Service for domain-specific metadata operations"""
    
    def __init__(self):
        self.base_service = HiveMetadataAPIService()
    
    def track_ml_inference(self, model_id: str, request_count: int, latency_ms: float):
        """Track ML inference events"""
        metadata = EnhancedMetadataModel(
            resource=f"inference_{model_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            subject="ml_inference_service",
            action="INFERENCE_EXECUTED"
        ).add_ml_context(
            model_version="v2.1.0",
            experiment_id="exp_001",
            dataset_version="v1.5"
        ).add_infrastructure_context(
            cluster_id="ml-cluster-prod",
            node_count=5,
            resource_usage={"cpu_percent": 75, "memory_gb": 32}
        )
        
        metadata.additionalValues.update({
            "request_count": str(request_count),
            "average_latency_ms": str(latency_ms),
            "throughput_rps": str(request_count / (latency_ms / 1000))
        })
        
        self.base_service.create_metadata(metadata)
        return metadata
    
    def track_data_pipeline_stage(self, pipeline_id: str, stage_name: str, 
                                 input_size: int, output_size: int):
        """Track data pipeline stage execution"""
        metadata = EnhancedMetadataModel(
            resource=f"{pipeline_id}_{stage_name}",
            subject="data_pipeline",
            action="STAGE_EXECUTED"
        ).add_data_context(
            schema_version="v1.0",
            partition_info={"partition_by": "date", "partition_count": 30},
            quality_metrics={"completeness": 0.98, "accuracy": 0.95}
        )
        
        metadata.additionalValues.update({
            "stage_name": stage_name,
            "input_record_count": str(input_size),
            "output_record_count": str(output_size),
            "data_reduction_ratio": str(output_size / input_size if input_size > 0 else 0)
        })
        
        self.base_service.create_metadata(metadata)
        return metadata

# Usage example
domain_service = DomainSpecificMetadataService()

# Track ML inference
inference_metadata = domain_service.track_ml_inference(
    model_id="fraud_detection_model",
    request_count=1000,
    latency_ms=45.0
)

# Track data pipeline stage
pipeline_metadata = domain_service.track_data_pipeline_stage(
    pipeline_id="customer_segmentation",
    stage_name="feature_engineering",
    input_size=1000000,
    output_size=950000
)

print(f"Tracked inference: {inference_metadata.resource}")
print(f"Tracked pipeline stage: {pipeline_metadata.resource}")

Best Practices

Metadata Design

  • Use consistent resource naming conventions
  • Include comprehensive context in additionalValues
  • Track both successful and failed operations
  • Implement structured logging for debugging

Service Selection

  • Use LoggingMetadataAPIService for development and testing
  • Use HiveMetadataAPIService for production environments
  • Consider custom implementations for specific requirements
  • Plan for metadata service migration strategies

Performance Considerations

  • Batch metadata operations when possible
  • Use asynchronous patterns for high-throughput scenarios
  • Monitor Kafka topic health and throughput
  • Implement metadata retention policies

Governance and Compliance

  • Establish metadata standards across teams
  • Regular metadata quality audits
  • Implement data lineage tracking
  • Maintain metadata schema versioning

Install with Tessl CLI

npx tessl i tessl/pypi-aissemble-foundation-core-python

docs

auth.md

bom.md

config.md

filestore.md

index.md

inference.md

metadata.md

policy.md

tile.json