Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.
—
Standardized client framework for machine learning model inference supporting both individual and batch processing with REST and gRPC protocol support. The inference system provides high-performance model serving capabilities with comprehensive request/response modeling and configurable service endpoints for scalable ML operations.
Abstract base interface defining standardized inference operations for both single predictions and batch processing with pluggable implementation support for different service protocols.
class InferenceClient(ABC):
"""
Interface for inference client.
Attributes:
- _config: InferenceConfig - Inference configuration
"""
def __init__(self) -> None:
"""Initialize with InferenceConfig"""
...
@classmethod
def __subclasshook__(cls, subclass):
"""Class method for subclass checking"""
...
@abstractmethod
def infer(self, inference_request: InferenceRequest) -> InferenceResult:
"""
Abstract method to invoke inference.
Parameters:
- inference_request: InferenceRequest - Request containing inference data
Returns:
InferenceResult - Inference prediction result
"""
...
@abstractmethod
def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> list[InferenceResultBatch]:
"""
Abstract method to invoke batch inference.
Parameters:
- inference_request_batch: InferenceRequestBatch - Batch of inference requests
Returns:
list[InferenceResultBatch] - List of batch inference results
"""
...Configuration management for inference service endpoints supporting both REST and gRPC protocols with configurable URLs and ports for flexible deployment scenarios.
class InferenceConfig:
"""
Configurations for inference.
"""
def __init__(self) -> None:
"""Initialize with inference.properties"""
...
def rest_service_url(self) -> str:
"""Returns URL of inference REST service (default: "http://localhost")"""
...
def rest_service_port(self) -> str:
"""Returns port of inference REST service (default: "7080")"""
...
def grpc_service_url(self) -> str:
"""Returns URL of inference gRPC service (default: "http://localhost")"""
...
def grpc_service_port(self) -> str:
"""Returns port of inference gRPC service (default: "7081")"""
...Comprehensive data models for capturing inference request information including source metadata, request categorization, and extensible attributes for diverse ML use cases.
class InferenceRequest:
"""
Contains details necessary for inference to be invoked.
Properties (with getters/setters):
- source_ip_address: str - Source IP address
- created: int - Creation timestamp
- kind: str - Request kind
- category: str - Request category
- outcome: str - Request outcome
"""
def __init__(self, source_ip_address: str = "", created: int = 0, kind: str = "", category: str = "", outcome: str = "") -> None:
"""
Constructor with default values.
Parameters:
- source_ip_address: str - Source IP address (default: "")
- created: int - Creation timestamp (default: 0)
- kind: str - Request kind (default: "")
- category: str - Request category (default: "")
- outcome: str - Request outcome (default: "")
"""
...
class InferenceRequestBatch:
"""
Contains details necessary for inference to be invoked on a batch.
Properties (with getters/setters):
- row_id_key: str - Row ID key
- data: list[InferenceRequest] - List of inference requests
"""
def __init__(self, row_id_key: str, data: list[InferenceRequest]) -> None:
"""
Constructor.
Parameters:
- row_id_key: str - Row identifier key
- data: list[InferenceRequest] - List of inference requests
"""
...Structured result models for capturing inference predictions including threat detection indicators, confidence scores, and batch processing support with row-level result tracking.
class InferenceResult:
"""
Contains details about the results of an inference request.
Properties (with getters/setters):
- threat_detected: bool - Whether threat was detected
- score: int - Inference score
"""
def __init__(self, threat_detected: bool = False, score: int = 0) -> None:
"""
Constructor with default values.
Parameters:
- threat_detected: bool - Whether threat was detected (default: False)
- score: int - Inference score (default: 0)
"""
...
class InferenceResultBatch:
"""
Represents a single result of a batch inference.
Properties (with getters/setters):
- row_id_key: str - Row ID key
- result: InferenceResult - Inference result
"""
def __init__(self, row_id_key: str, result: InferenceResult) -> None:
"""
Constructor.
Parameters:
- row_id_key: str - Row identifier key
- result: InferenceResult - Inference result for this row
"""
...Production-ready REST-based inference client implementation with async HTTP operations, JSON serialization, and comprehensive error handling for scalable model serving.
class RestInferenceClient(InferenceClient):
"""
REST-based implementation of InferenceClient.
"""
async def infer(self, inference_request: InferenceRequest) -> InferenceResult:
"""
Async method for single inference.
Parameters:
- inference_request: InferenceRequest - Request data for inference
Returns:
InferenceResult - Prediction result
"""
...
async def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> list[InferenceResultBatch]:
"""
Async method for batch inference.
Parameters:
- inference_request_batch: InferenceRequestBatch - Batch of requests
Returns:
list[InferenceResultBatch] - List of batch results
"""
...from inference.rest_inference_client import RestInferenceClient
from inference.inference_request import InferenceRequest
from inference.inference_result import InferenceResult
import asyncio
from datetime import datetime
async def basic_inference_example():
"""Demonstrate basic inference operations"""
# Initialize REST inference client
client = RestInferenceClient()
# Create inference request
request = InferenceRequest(
source_ip_address="192.168.1.100",
created=int(datetime.now().timestamp()),
kind="security_scan",
category="network_traffic",
outcome="" # Will be populated by inference
)
try:
# Perform single inference
result = await client.infer(request)
print(f"Inference completed:")
print(f" Threat detected: {result.threat_detected}")
print(f" Confidence score: {result.score}")
if result.threat_detected:
print("⚠️ Security threat identified - taking protective action")
else:
print("✅ No threats detected - traffic appears safe")
except Exception as e:
print(f"Inference failed: {e}")
# Run the example
asyncio.run(basic_inference_example())from inference.rest_inference_client import RestInferenceClient
from inference.inference_request import InferenceRequest, InferenceRequestBatch
from inference.inference_result import InferenceResultBatch
import asyncio
import pandas as pd
from datetime import datetime
class BatchInferenceProcessor:
"""Process large datasets using batch inference"""
def __init__(self):
self.client = RestInferenceClient()
async def process_security_logs(self, log_data: pd.DataFrame) -> pd.DataFrame:
"""Process security logs through batch inference"""
# Convert DataFrame to inference requests
requests = []
for idx, row in log_data.iterrows():
request = InferenceRequest(
source_ip_address=row.get('source_ip', ''),
created=int(datetime.now().timestamp()),
kind="log_analysis",
category=row.get('log_type', 'general'),
outcome=""
)
requests.append(request)
# Create batch request
batch_request = InferenceRequestBatch(
row_id_key="log_id",
data=requests
)
try:
# Perform batch inference
print(f"Processing {len(requests)} log entries...")
batch_results = await self.client.infer_batch(batch_request)
# Convert results back to DataFrame
results_data = []
for i, batch_result in enumerate(batch_results):
results_data.append({
'log_index': i,
'threat_detected': batch_result.result.threat_detected,
'threat_score': batch_result.result.score,
'source_ip': requests[i].source_ip_address,
'log_category': requests[i].category
})
results_df = pd.DataFrame(results_data)
# Print summary
threat_count = results_df['threat_detected'].sum()
print(f"Batch processing complete:")
print(f" Total logs processed: {len(results_df)}")
print(f" Threats detected: {threat_count}")
print(f" Threat percentage: {(threat_count/len(results_df)*100):.2f}%")
return results_df
except Exception as e:
print(f"Batch inference failed: {e}")
return pd.DataFrame()
async def real_time_stream_processing(self, stream_data):
"""Process streaming data with real-time inference"""
threat_alerts = []
for data_point in stream_data:
request = InferenceRequest(
source_ip_address=data_point.get('ip'),
created=int(datetime.now().timestamp()),
kind="real_time_scan",
category=data_point.get('type', 'unknown')
)
try:
result = await self.client.infer(request)
# Handle high-risk threats immediately
if result.threat_detected and result.score > 80:
alert = {
'timestamp': datetime.now().isoformat(),
'source_ip': request.source_ip_address,
'threat_score': result.score,
'category': request.category,
'severity': 'HIGH' if result.score > 90 else 'MEDIUM'
}
threat_alerts.append(alert)
print(f"🚨 HIGH THREAT ALERT: {alert}")
except Exception as e:
print(f"Real-time inference error: {e}")
return threat_alerts
# Usage example
async def run_batch_processing():
processor = BatchInferenceProcessor()
# Sample log data
sample_logs = pd.DataFrame({
'source_ip': ['192.168.1.100', '10.0.0.50', '172.16.0.25', '203.0.113.10'],
'log_type': ['access_log', 'error_log', 'security_log', 'access_log'],
'timestamp': [datetime.now().isoformat()] * 4
})
# Process batch
results = await processor.process_security_logs(sample_logs)
print("Batch results:")
print(results)
# Process real-time stream
stream_data = [
{'ip': '192.168.1.200', 'type': 'suspicious_activity'},
{'ip': '10.0.0.75', 'type': 'normal_traffic'},
{'ip': '172.16.0.100', 'type': 'potential_intrusion'}
]
alerts = await processor.real_time_stream_processing(stream_data)
print(f"Generated {len(alerts)} threat alerts")
# Run the batch processing example
asyncio.run(run_batch_processing())from inference.rest_inference_client import RestInferenceClient
from inference.inference_config import InferenceConfig
from inference.inference_request import InferenceRequest
import asyncio
import aiohttp
from typing import List, Dict, Any
import time
from dataclasses import dataclass
@dataclass
class PerformanceMetrics:
"""Track inference performance metrics"""
total_requests: int = 0
successful_inferences: int = 0
failed_inferences: int = 0
total_latency: float = 0.0
min_latency: float = float('inf')
max_latency: float = 0.0
@property
def average_latency(self) -> float:
return self.total_latency / self.total_requests if self.total_requests > 0 else 0.0
@property
def success_rate(self) -> float:
return self.successful_inferences / self.total_requests if self.total_requests > 0 else 0.0
class HighPerformanceInferenceClient:
"""High-performance inference client with connection pooling and metrics"""
def __init__(self, max_concurrent_requests: int = 50):
self.config = InferenceConfig()
self.max_concurrent_requests = max_concurrent_requests
self.metrics = PerformanceMetrics()
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
async def __aenter__(self):
"""Async context manager entry"""
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=50, # Connections per host
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
)
timeout = aiohttp.ClientTimeout(
total=30, # Total timeout
connect=5, # Connection timeout
sock_read=10 # Socket read timeout
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.session.close()
async def infer_with_metrics(self, request: InferenceRequest) -> Dict[str, Any]:
"""Perform inference with performance tracking"""
async with self.semaphore: # Limit concurrent requests
start_time = time.time()
try:
# Use direct HTTP call for performance
url = f"{self.config.rest_service_url()}:{self.config.rest_service_port()}/infer"
request_data = {
'source_ip_address': request.source_ip_address,
'created': request.created,
'kind': request.kind,
'category': request.category,
'outcome': request.outcome
}
async with self.session.post(url, json=request_data) as response:
response.raise_for_status()
result_data = await response.json()
# Calculate latency
latency = time.time() - start_time
# Update metrics
self.metrics.total_requests += 1
self.metrics.successful_inferences += 1
self.metrics.total_latency += latency
self.metrics.min_latency = min(self.metrics.min_latency, latency)
self.metrics.max_latency = max(self.metrics.max_latency, latency)
return {
'success': True,
'result': result_data,
'latency': latency
}
except Exception as e:
latency = time.time() - start_time
# Update error metrics
self.metrics.total_requests += 1
self.metrics.failed_inferences += 1
self.metrics.total_latency += latency
return {
'success': False,
'error': str(e),
'latency': latency
}
async def batch_infer_concurrent(self, requests: List[InferenceRequest]) -> List[Dict[str, Any]]:
"""Process multiple requests concurrently"""
tasks = [self.infer_with_metrics(request) for request in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def get_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
return {
'total_requests': self.metrics.total_requests,
'successful_requests': self.metrics.successful_inferences,
'failed_requests': self.metrics.failed_inferences,
'success_rate_percent': self.metrics.success_rate * 100,
'average_latency_ms': self.metrics.average_latency * 1000,
'min_latency_ms': self.metrics.min_latency * 1000 if self.metrics.min_latency != float('inf') else 0,
'max_latency_ms': self.metrics.max_latency * 1000,
'requests_per_second': self.metrics.total_requests / (self.metrics.total_latency / self.metrics.total_requests) if self.metrics.total_requests > 0 else 0
}
async def performance_benchmark():
"""Benchmark inference performance with different load patterns"""
async with HighPerformanceInferenceClient(max_concurrent_requests=25) as client:
# Generate test requests
test_requests = []
for i in range(100):
request = InferenceRequest(
source_ip_address=f"192.168.1.{100 + i % 50}",
created=int(time.time()),
kind="performance_test",
category=f"test_category_{i % 5}",
outcome=""
)
test_requests.append(request)
print("Starting performance benchmark...")
start_time = time.time()
# Process all requests concurrently
results = await client.batch_infer_concurrent(test_requests)
total_time = time.time() - start_time
# Analyze results
successful_results = [r for r in results if isinstance(r, dict) and r.get('success')]
failed_results = [r for r in results if isinstance(r, dict) and not r.get('success')]
print(f"\nPerformance Benchmark Results:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Successful inferences: {len(successful_results)}")
print(f"Failed inferences: {len(failed_results)}")
print(f"Overall throughput: {len(results)/total_time:.2f} requests/second")
# Get detailed performance report
report = client.get_performance_report()
print(f"\nDetailed Performance Metrics:")
for metric, value in report.items():
print(f" {metric}: {value:.2f}")
# Run performance benchmark
asyncio.run(performance_benchmark())from inference.inference_client import InferenceClient
from inference.inference_config import InferenceConfig
from inference.inference_request import InferenceRequest, InferenceRequestBatch
from inference.inference_result import InferenceResult, InferenceResultBatch
import grpc
import json
from typing import List
class GrpcInferenceClient(InferenceClient):
"""Custom gRPC-based inference client implementation"""
def __init__(self):
super().__init__()
self.config = InferenceConfig()
self.channel = None
self.stub = None
async def connect(self):
"""Establish gRPC connection"""
server_address = f"{self.config.grpc_service_url()}:{self.config.grpc_service_port()}"
# Create gRPC channel with configuration
channel_options = [
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
('grpc.http2.max_pings_without_data', 0),
('grpc.http2.min_time_between_pings_ms', 10000),
]
self.channel = grpc.aio.insecure_channel(server_address, options=channel_options)
# Create stub (this would use generated gRPC stub in real implementation)
# self.stub = inference_service_pb2_grpc.InferenceServiceStub(self.channel)
print(f"Connected to gRPC inference service at {server_address}")
async def disconnect(self):
"""Close gRPC connection"""
if self.channel:
await self.channel.close()
async def infer(self, inference_request: InferenceRequest) -> InferenceResult:
"""Perform single inference via gRPC"""
if not self.channel:
await self.connect()
try:
# Convert to gRPC request format (pseudo-code)
grpc_request = {
'source_ip': inference_request.source_ip_address,
'timestamp': inference_request.created,
'request_type': inference_request.kind,
'category': inference_request.category
}
# Make gRPC call (pseudo-code)
# response = await self.stub.Infer(grpc_request)
# Simulate gRPC response for example
response = {
'threat_detected': True, # Simulate threat detection
'confidence_score': 85 # Simulate confidence score
}
# Convert gRPC response to InferenceResult
result = InferenceResult(
threat_detected=response['threat_detected'],
score=response['confidence_score']
)
return result
except Exception as e:
print(f"gRPC inference failed: {e}")
raise
async def infer_batch(self, inference_request_batch: InferenceRequestBatch) -> List[InferenceResultBatch]:
"""Perform batch inference via gRPC"""
if not self.channel:
await self.connect()
try:
# Convert batch to gRPC format
grpc_requests = []
for i, request in enumerate(inference_request_batch.data):
grpc_request = {
'id': f"{inference_request_batch.row_id_key}_{i}",
'source_ip': request.source_ip_address,
'timestamp': request.created,
'request_type': request.kind,
'category': request.category
}
grpc_requests.append(grpc_request)
# Make batch gRPC call (pseudo-code)
# batch_response = await self.stub.InferBatch({'requests': grpc_requests})
# Simulate batch response
batch_results = []
for i, request in enumerate(grpc_requests):
result = InferenceResultBatch(
row_id_key=request['id'],
result=InferenceResult(
threat_detected=i % 3 == 0, # Simulate some threats
score=70 + (i * 5) % 30 # Simulate varying scores
)
)
batch_results.append(result)
return batch_results
except Exception as e:
print(f"gRPC batch inference failed: {e}")
raise
class InferenceClientFactory:
"""Factory for creating different types of inference clients"""
@staticmethod
def create_client(client_type: str = "rest") -> InferenceClient:
"""Create inference client based on type"""
if client_type.lower() == "rest":
from inference.rest_inference_client import RestInferenceClient
return RestInferenceClient()
elif client_type.lower() == "grpc":
return GrpcInferenceClient()
else:
raise ValueError(f"Unsupported client type: {client_type}")
@staticmethod
def create_best_available_client() -> InferenceClient:
"""Create the best available client based on service availability"""
config = InferenceConfig()
# Try gRPC first for better performance
try:
grpc_client = GrpcInferenceClient()
# Test connection (simplified)
print("Using gRPC inference client")
return grpc_client
except Exception:
print("gRPC not available, falling back to REST")
# Fallback to REST
try:
from inference.rest_inference_client import RestInferenceClient
rest_client = RestInferenceClient()
print("Using REST inference client")
return rest_client
except Exception as e:
raise RuntimeError(f"No inference clients available: {e}")
# Usage example
async def multi_client_example():
"""Demonstrate multiple client types"""
# Create different client types
rest_client = InferenceClientFactory.create_client("rest")
grpc_client = InferenceClientFactory.create_client("grpc")
best_client = InferenceClientFactory.create_best_available_client()
# Test request
request = InferenceRequest(
source_ip_address="10.0.0.100",
created=int(time.time()),
kind="multi_client_test",
category="performance_comparison"
)
# Compare performance between clients
clients = [
("REST", rest_client),
("gRPC", grpc_client),
("Best Available", best_client)
]
for client_name, client in clients:
try:
start_time = time.time()
result = await client.infer(request)
latency = time.time() - start_time
print(f"{client_name} Client:")
print(f" Latency: {latency*1000:.2f} ms")
print(f" Threat detected: {result.threat_detected}")
print(f" Score: {result.score}")
print()
except Exception as e:
print(f"{client_name} Client failed: {e}")
# Clean up gRPC connection
if hasattr(client, 'disconnect'):
await client.disconnect()
# Run the multi-client example
asyncio.run(multi_client_example())Install with Tessl CLI
npx tessl i tessl/pypi-aissemble-foundation-core-python