CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Pending
Overview
Eval results
Files

inference.mddocs/

ML Inference Client

Standardized client framework for machine learning model inference supporting both individual and batch processing with REST and gRPC protocol support. The inference system provides high-performance model serving capabilities with comprehensive request/response modeling and configurable service endpoints for scalable ML operations.

Capabilities

Inference Client Interface

Abstract base interface defining standardized inference operations for both single predictions and batch processing with pluggable implementation support for different service protocols.

class InferenceClient(ABC):
    """
    Interface for inference client.
    
    Attributes:
    - _config: InferenceConfig - Inference configuration
    """
    
    def __init__(self) -> None:
        """Initialize with InferenceConfig"""
        ...
    
    @classmethod
    def __subclasshook__(cls, subclass):
        """Class method for subclass checking"""
        ...
    
    @abstractmethod
    def infer(self, inference_request: InferenceRequest) -> InferenceResult:
        """
        Abstract method to invoke inference.
        
        Parameters:
        - inference_request: InferenceRequest - Request containing inference data
        
        Returns:
        InferenceResult - Inference prediction result
        """
        ...
    
    @abstractmethod
    def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> list[InferenceResultBatch]:
        """
        Abstract method to invoke batch inference.
        
        Parameters:
        - inference_request_batch: InferenceRequestBatch - Batch of inference requests
        
        Returns:
        list[InferenceResultBatch] - List of batch inference results
        """
        ...

Inference Configuration

Configuration management for inference service endpoints supporting both REST and gRPC protocols with configurable URLs and ports for flexible deployment scenarios.

class InferenceConfig:
    """
    Configurations for inference.
    """
    
    def __init__(self) -> None:
        """Initialize with inference.properties"""
        ...
    
    def rest_service_url(self) -> str:
        """Returns URL of inference REST service (default: "http://localhost")"""
        ...
    
    def rest_service_port(self) -> str:
        """Returns port of inference REST service (default: "7080")"""
        ...
    
    def grpc_service_url(self) -> str:
        """Returns URL of inference gRPC service (default: "http://localhost")"""
        ...
    
    def grpc_service_port(self) -> str:
        """Returns port of inference gRPC service (default: "7081")"""
        ...

Inference Request Models

Comprehensive data models for capturing inference request information including source metadata, request categorization, and extensible attributes for diverse ML use cases.

class InferenceRequest:
    """
    Contains details necessary for inference to be invoked.
    
    Properties (with getters/setters):
    - source_ip_address: str - Source IP address
    - created: int - Creation timestamp  
    - kind: str - Request kind
    - category: str - Request category
    - outcome: str - Request outcome
    """
    
    def __init__(self, source_ip_address: str = "", created: int = 0, kind: str = "", category: str = "", outcome: str = "") -> None:
        """
        Constructor with default values.
        
        Parameters:
        - source_ip_address: str - Source IP address (default: "")
        - created: int - Creation timestamp (default: 0)
        - kind: str - Request kind (default: "")
        - category: str - Request category (default: "")
        - outcome: str - Request outcome (default: "")
        """
        ...

class InferenceRequestBatch:
    """
    Contains details necessary for inference to be invoked on a batch.
    
    Properties (with getters/setters):
    - row_id_key: str - Row ID key
    - data: list[InferenceRequest] - List of inference requests
    """
    
    def __init__(self, row_id_key: str, data: list[InferenceRequest]) -> None:
        """
        Constructor.
        
        Parameters:
        - row_id_key: str - Row identifier key
        - data: list[InferenceRequest] - List of inference requests
        """
        ...

Inference Result Models

Structured result models for capturing inference predictions including threat detection indicators, confidence scores, and batch processing support with row-level result tracking.

class InferenceResult:
    """
    Contains details about the results of an inference request.
    
    Properties (with getters/setters):
    - threat_detected: bool - Whether threat was detected
    - score: int - Inference score
    """
    
    def __init__(self, threat_detected: bool = False, score: int = 0) -> None:
        """
        Constructor with default values.
        
        Parameters:
        - threat_detected: bool - Whether threat was detected (default: False)
        - score: int - Inference score (default: 0)
        """
        ...

class InferenceResultBatch:
    """
    Represents a single result of a batch inference.
    
    Properties (with getters/setters):
    - row_id_key: str - Row ID key
    - result: InferenceResult - Inference result
    """
    
    def __init__(self, row_id_key: str, result: InferenceResult) -> None:
        """
        Constructor.
        
        Parameters:
        - row_id_key: str - Row identifier key
        - result: InferenceResult - Inference result for this row
        """
        ...

REST Inference Client

Production-ready REST-based inference client implementation with async HTTP operations, JSON serialization, and comprehensive error handling for scalable model serving.

class RestInferenceClient(InferenceClient):
    """
    REST-based implementation of InferenceClient.
    """
    
    async def infer(self, inference_request: InferenceRequest) -> InferenceResult:
        """
        Async method for single inference.
        
        Parameters:
        - inference_request: InferenceRequest - Request data for inference
        
        Returns:
        InferenceResult - Prediction result
        """
        ...
    
    async def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> list[InferenceResultBatch]:
        """
        Async method for batch inference.
        
        Parameters:
        - inference_request_batch: InferenceRequestBatch - Batch of requests
        
        Returns:
        list[InferenceResultBatch] - List of batch results
        """
        ...

Usage Examples

Basic Inference Operations

from inference.rest_inference_client import RestInferenceClient
from inference.inference_request import InferenceRequest
from inference.inference_result import InferenceResult
import asyncio
from datetime import datetime

async def basic_inference_example():
    """Demonstrate basic inference operations"""
    
    # Initialize REST inference client
    client = RestInferenceClient()
    
    # Create inference request
    request = InferenceRequest(
        source_ip_address="192.168.1.100",
        created=int(datetime.now().timestamp()),
        kind="security_scan",
        category="network_traffic",
        outcome=""  # Will be populated by inference
    )
    
    try:
        # Perform single inference
        result = await client.infer(request)
        
        print(f"Inference completed:")
        print(f"  Threat detected: {result.threat_detected}")
        print(f"  Confidence score: {result.score}")
        
        if result.threat_detected:
            print("⚠️  Security threat identified - taking protective action")
        else:
            print("✅ No threats detected - traffic appears safe")
            
    except Exception as e:
        print(f"Inference failed: {e}")

# Run the example
asyncio.run(basic_inference_example())

Batch Inference Processing

from inference.rest_inference_client import RestInferenceClient
from inference.inference_request import InferenceRequest, InferenceRequestBatch
from inference.inference_result import InferenceResultBatch
import asyncio
import pandas as pd
from datetime import datetime

class BatchInferenceProcessor:
    """Process large datasets using batch inference"""
    
    def __init__(self):
        self.client = RestInferenceClient()
    
    async def process_security_logs(self, log_data: pd.DataFrame) -> pd.DataFrame:
        """Process security logs through batch inference"""
        
        # Convert DataFrame to inference requests
        requests = []
        for idx, row in log_data.iterrows():
            request = InferenceRequest(
                source_ip_address=row.get('source_ip', ''),
                created=int(datetime.now().timestamp()),
                kind="log_analysis",
                category=row.get('log_type', 'general'),
                outcome=""
            )
            requests.append(request)
        
        # Create batch request
        batch_request = InferenceRequestBatch(
            row_id_key="log_id",
            data=requests
        )
        
        try:
            # Perform batch inference
            print(f"Processing {len(requests)} log entries...")
            batch_results = await self.client.infer_batch(batch_request)
            
            # Convert results back to DataFrame
            results_data = []
            for i, batch_result in enumerate(batch_results):
                results_data.append({
                    'log_index': i,
                    'threat_detected': batch_result.result.threat_detected,
                    'threat_score': batch_result.result.score,
                    'source_ip': requests[i].source_ip_address,
                    'log_category': requests[i].category
                })
            
            results_df = pd.DataFrame(results_data)
            
            # Print summary
            threat_count = results_df['threat_detected'].sum()
            print(f"Batch processing complete:")
            print(f"  Total logs processed: {len(results_df)}")
            print(f"  Threats detected: {threat_count}")
            print(f"  Threat percentage: {(threat_count/len(results_df)*100):.2f}%")
            
            return results_df
            
        except Exception as e:
            print(f"Batch inference failed: {e}")
            return pd.DataFrame()
    
    async def real_time_stream_processing(self, stream_data):
        """Process streaming data with real-time inference"""
        threat_alerts = []
        
        for data_point in stream_data:
            request = InferenceRequest(
                source_ip_address=data_point.get('ip'),
                created=int(datetime.now().timestamp()),
                kind="real_time_scan",
                category=data_point.get('type', 'unknown')
            )
            
            try:
                result = await self.client.infer(request)
                
                # Handle high-risk threats immediately
                if result.threat_detected and result.score > 80:
                    alert = {
                        'timestamp': datetime.now().isoformat(),
                        'source_ip': request.source_ip_address,
                        'threat_score': result.score,
                        'category': request.category,
                        'severity': 'HIGH' if result.score > 90 else 'MEDIUM'
                    }
                    threat_alerts.append(alert)
                    print(f"🚨 HIGH THREAT ALERT: {alert}")
                
            except Exception as e:
                print(f"Real-time inference error: {e}")
        
        return threat_alerts

# Usage example
async def run_batch_processing():
    processor = BatchInferenceProcessor()
    
    # Sample log data
    sample_logs = pd.DataFrame({
        'source_ip': ['192.168.1.100', '10.0.0.50', '172.16.0.25', '203.0.113.10'],
        'log_type': ['access_log', 'error_log', 'security_log', 'access_log'],
        'timestamp': [datetime.now().isoformat()] * 4
    })
    
    # Process batch
    results = await processor.process_security_logs(sample_logs)
    print("Batch results:")
    print(results)
    
    # Process real-time stream
    stream_data = [
        {'ip': '192.168.1.200', 'type': 'suspicious_activity'},
        {'ip': '10.0.0.75', 'type': 'normal_traffic'},
        {'ip': '172.16.0.100', 'type': 'potential_intrusion'}
    ]
    
    alerts = await processor.real_time_stream_processing(stream_data)
    print(f"Generated {len(alerts)} threat alerts")

# Run the batch processing example
asyncio.run(run_batch_processing())

High-Performance Inference Pipeline

from inference.rest_inference_client import RestInferenceClient
from inference.inference_config import InferenceConfig
from inference.inference_request import InferenceRequest
import asyncio
import aiohttp
from typing import List, Dict, Any
import time
from dataclasses import dataclass

@dataclass
class PerformanceMetrics:
    """Track inference performance metrics"""
    total_requests: int = 0
    successful_inferences: int = 0
    failed_inferences: int = 0
    total_latency: float = 0.0
    min_latency: float = float('inf')
    max_latency: float = 0.0
    
    @property
    def average_latency(self) -> float:
        return self.total_latency / self.total_requests if self.total_requests > 0 else 0.0
    
    @property
    def success_rate(self) -> float:
        return self.successful_inferences / self.total_requests if self.total_requests > 0 else 0.0

class HighPerformanceInferenceClient:
    """High-performance inference client with connection pooling and metrics"""
    
    def __init__(self, max_concurrent_requests: int = 50):
        self.config = InferenceConfig()
        self.max_concurrent_requests = max_concurrent_requests
        self.metrics = PerformanceMetrics()
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async def __aenter__(self):
        """Async context manager entry"""
        connector = aiohttp.TCPConnector(
            limit=100,  # Total connection pool size
            limit_per_host=50,  # Connections per host
            ttl_dns_cache=300,  # DNS cache TTL
            use_dns_cache=True,
        )
        
        timeout = aiohttp.ClientTimeout(
            total=30,  # Total timeout
            connect=5,  # Connection timeout
            sock_read=10  # Socket read timeout
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        await self.session.close()
    
    async def infer_with_metrics(self, request: InferenceRequest) -> Dict[str, Any]:
        """Perform inference with performance tracking"""
        async with self.semaphore:  # Limit concurrent requests
            start_time = time.time()
            
            try:
                # Use direct HTTP call for performance
                url = f"{self.config.rest_service_url()}:{self.config.rest_service_port()}/infer"
                
                request_data = {
                    'source_ip_address': request.source_ip_address,
                    'created': request.created,
                    'kind': request.kind,
                    'category': request.category,
                    'outcome': request.outcome
                }
                
                async with self.session.post(url, json=request_data) as response:
                    response.raise_for_status()
                    result_data = await response.json()
                
                # Calculate latency
                latency = time.time() - start_time
                
                # Update metrics
                self.metrics.total_requests += 1
                self.metrics.successful_inferences += 1
                self.metrics.total_latency += latency
                self.metrics.min_latency = min(self.metrics.min_latency, latency)
                self.metrics.max_latency = max(self.metrics.max_latency, latency)
                
                return {
                    'success': True,
                    'result': result_data,
                    'latency': latency
                }
                
            except Exception as e:
                latency = time.time() - start_time
                
                # Update error metrics
                self.metrics.total_requests += 1
                self.metrics.failed_inferences += 1
                self.metrics.total_latency += latency
                
                return {
                    'success': False,
                    'error': str(e),
                    'latency': latency
                }
    
    async def batch_infer_concurrent(self, requests: List[InferenceRequest]) -> List[Dict[str, Any]]:
        """Process multiple requests concurrently"""
        tasks = [self.infer_with_metrics(request) for request in requests]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Generate comprehensive performance report"""
        return {
            'total_requests': self.metrics.total_requests,
            'successful_requests': self.metrics.successful_inferences,
            'failed_requests': self.metrics.failed_inferences,
            'success_rate_percent': self.metrics.success_rate * 100,
            'average_latency_ms': self.metrics.average_latency * 1000,
            'min_latency_ms': self.metrics.min_latency * 1000 if self.metrics.min_latency != float('inf') else 0,
            'max_latency_ms': self.metrics.max_latency * 1000,
            'requests_per_second': self.metrics.total_requests / (self.metrics.total_latency / self.metrics.total_requests) if self.metrics.total_requests > 0 else 0
        }

async def performance_benchmark():
    """Benchmark inference performance with different load patterns"""
    
    async with HighPerformanceInferenceClient(max_concurrent_requests=25) as client:
        
        # Generate test requests
        test_requests = []
        for i in range(100):
            request = InferenceRequest(
                source_ip_address=f"192.168.1.{100 + i % 50}",
                created=int(time.time()),
                kind="performance_test",
                category=f"test_category_{i % 5}",
                outcome=""
            )
            test_requests.append(request)
        
        print("Starting performance benchmark...")
        start_time = time.time()
        
        # Process all requests concurrently
        results = await client.batch_infer_concurrent(test_requests)
        
        total_time = time.time() - start_time
        
        # Analyze results
        successful_results = [r for r in results if isinstance(r, dict) and r.get('success')]
        failed_results = [r for r in results if isinstance(r, dict) and not r.get('success')]
        
        print(f"\nPerformance Benchmark Results:")
        print(f"Total processing time: {total_time:.2f} seconds")
        print(f"Successful inferences: {len(successful_results)}")
        print(f"Failed inferences: {len(failed_results)}")
        print(f"Overall throughput: {len(results)/total_time:.2f} requests/second")
        
        # Get detailed performance report
        report = client.get_performance_report()
        print(f"\nDetailed Performance Metrics:")
        for metric, value in report.items():
            print(f"  {metric}: {value:.2f}")

# Run performance benchmark
asyncio.run(performance_benchmark())

Custom Inference Client Implementation

from inference.inference_client import InferenceClient
from inference.inference_config import InferenceConfig
from inference.inference_request import InferenceRequest, InferenceRequestBatch
from inference.inference_result import InferenceResult, InferenceResultBatch
import grpc
import json
from typing import List

class GrpcInferenceClient(InferenceClient):
    """Custom gRPC-based inference client implementation"""
    
    def __init__(self):
        super().__init__()
        self.config = InferenceConfig()
        self.channel = None
        self.stub = None
    
    async def connect(self):
        """Establish gRPC connection"""
        server_address = f"{self.config.grpc_service_url()}:{self.config.grpc_service_port()}"
        
        # Create gRPC channel with configuration
        channel_options = [
            ('grpc.keepalive_time_ms', 30000),
            ('grpc.keepalive_timeout_ms', 5000),
            ('grpc.keepalive_permit_without_calls', True),
            ('grpc.http2.max_pings_without_data', 0),
            ('grpc.http2.min_time_between_pings_ms', 10000),
        ]
        
        self.channel = grpc.aio.insecure_channel(server_address, options=channel_options)
        
        # Create stub (this would use generated gRPC stub in real implementation)
        # self.stub = inference_service_pb2_grpc.InferenceServiceStub(self.channel)
        
        print(f"Connected to gRPC inference service at {server_address}")
    
    async def disconnect(self):
        """Close gRPC connection"""
        if self.channel:
            await self.channel.close()
    
    async def infer(self, inference_request: InferenceRequest) -> InferenceResult:
        """Perform single inference via gRPC"""
        if not self.channel:
            await self.connect()
        
        try:
            # Convert to gRPC request format (pseudo-code)
            grpc_request = {
                'source_ip': inference_request.source_ip_address,
                'timestamp': inference_request.created,
                'request_type': inference_request.kind,
                'category': inference_request.category
            }
            
            # Make gRPC call (pseudo-code)
            # response = await self.stub.Infer(grpc_request)
            
            # Simulate gRPC response for example
            response = {
                'threat_detected': True,  # Simulate threat detection
                'confidence_score': 85    # Simulate confidence score
            }
            
            # Convert gRPC response to InferenceResult
            result = InferenceResult(
                threat_detected=response['threat_detected'],
                score=response['confidence_score']
            )
            
            return result
            
        except Exception as e:
            print(f"gRPC inference failed: {e}")
            raise
    
    async def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> List[InferenceResultBatch]:
        """Perform batch inference via gRPC"""
        if not self.channel:
            await self.connect()
        
        try:
            # Convert batch to gRPC format
            grpc_requests = []
            for i, request in enumerate(inference_request_batch.data):
                grpc_request = {
                    'id': f"{inference_request_batch.row_id_key}_{i}",
                    'source_ip': request.source_ip_address,
                    'timestamp': request.created,
                    'request_type': request.kind,
                    'category': request.category
                }
                grpc_requests.append(grpc_request)
            
            # Make batch gRPC call (pseudo-code)
            # batch_response = await self.stub.InferBatch({'requests': grpc_requests})
            
            # Simulate batch response
            batch_results = []
            for i, request in enumerate(grpc_requests):
                result = InferenceResultBatch(
                    row_id_key=request['id'],
                    result=InferenceResult(
                        threat_detected=i % 3 == 0,  # Simulate some threats
                        score=70 + (i * 5) % 30      # Simulate varying scores
                    )
                )
                batch_results.append(result)
            
            return batch_results
            
        except Exception as e:
            print(f"gRPC batch inference failed: {e}")
            raise

class InferenceClientFactory:
    """Factory for creating different types of inference clients"""
    
    @staticmethod
    def create_client(client_type: str = "rest") -> InferenceClient:
        """Create inference client based on type"""
        if client_type.lower() == "rest":
            from inference.rest_inference_client import RestInferenceClient
            return RestInferenceClient()
        elif client_type.lower() == "grpc":
            return GrpcInferenceClient()
        else:
            raise ValueError(f"Unsupported client type: {client_type}")
    
    @staticmethod
    def create_best_available_client() -> InferenceClient:
        """Create the best available client based on service availability"""
        config = InferenceConfig()
        
        # Try gRPC first for better performance
        try:
            grpc_client = GrpcInferenceClient()
            # Test connection (simplified)
            print("Using gRPC inference client")
            return grpc_client
        except Exception:
            print("gRPC not available, falling back to REST")
        
        # Fallback to REST
        try:
            from inference.rest_inference_client import RestInferenceClient
            rest_client = RestInferenceClient()
            print("Using REST inference client")
            return rest_client
        except Exception as e:
            raise RuntimeError(f"No inference clients available: {e}")

# Usage example
async def multi_client_example():
    """Demonstrate multiple client types"""
    
    # Create different client types
    rest_client = InferenceClientFactory.create_client("rest")
    grpc_client = InferenceClientFactory.create_client("grpc")
    best_client = InferenceClientFactory.create_best_available_client()
    
    # Test request
    request = InferenceRequest(
        source_ip_address="10.0.0.100",
        created=int(time.time()),
        kind="multi_client_test",
        category="performance_comparison"
    )
    
    # Compare performance between clients
    clients = [
        ("REST", rest_client),
        ("gRPC", grpc_client),
        ("Best Available", best_client)
    ]
    
    for client_name, client in clients:
        try:
            start_time = time.time()
            result = await client.infer(request)
            latency = time.time() - start_time
            
            print(f"{client_name} Client:")
            print(f"  Latency: {latency*1000:.2f} ms")
            print(f"  Threat detected: {result.threat_detected}")
            print(f"  Score: {result.score}")
            print()
            
        except Exception as e:
            print(f"{client_name} Client failed: {e}")
        
        # Clean up gRPC connection
        if hasattr(client, 'disconnect'):
            await client.disconnect()

# Run the multi-client example
asyncio.run(multi_client_example())

Best Practices

Client Configuration

  • Use appropriate client types based on performance requirements (gRPC for high-throughput, REST for simplicity)
  • Configure connection pooling and timeouts appropriately
  • Implement health checks for service availability
  • Use circuit breakers for fault tolerance

Performance Optimization

  • Implement concurrent request processing for batch operations
  • Use connection pooling to minimize connection overhead
  • Monitor latency and throughput metrics
  • Implement caching for frequently requested predictions

Error Handling and Reliability

  • Implement retry logic with exponential backoff
  • Use timeouts to prevent hanging requests
  • Log detailed error information for debugging
  • Implement graceful degradation for service failures

Security Considerations

  • Use HTTPS/TLS for REST endpoints
  • Implement authentication for inference services
  • Validate and sanitize input data
  • Monitor for suspicious inference patterns

Install with Tessl CLI

npx tessl i tessl/pypi-aissemble-foundation-core-python

docs

auth.md

bom.md

config.md

filestore.md

index.md

inference.md

metadata.md

policy.md

tile.json