CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-networks

The networks core of the Minos Framework providing networking components for reactive microservices

Pending
Overview
Eval results
Files

system-utils.mddocs/

System and Utilities

System health endpoints, network utilities, and various helper functions for microservice operations.

Capabilities

System Health Service

Built-in service providing system health endpoints for monitoring and health checks.

class SystemService:
    @enroute.rest.command("/system/health", "GET")
    async def check_health(self, request: Request) -> Response: ...

Usage Examples:

from minos.networks import SystemService, enroute

# SystemService is automatically available
# Access health check at: GET /system/health

# Returns response like:
# {
#   "status": "healthy",
#   "host_ip": "192.168.1.100", 
#   "timestamp": "2023-09-10T08:30:00Z"
# }

# Custom health check extension
class CustomHealthService(SystemService):
    @enroute.rest.command("/system/health", "GET")
    async def check_health(self, request: Request) -> Response:
        # Call parent health check
        base_response = await super().check_health(request)
        base_health = await base_response.content()
        
        # Add custom health checks
        custom_checks = {
            "database": await self.check_database(),
            "redis": await self.check_redis(),
            "external_api": await self.check_external_api()
        }
        
        # Combine results
        health_data = {
            **base_health,
            "checks": custom_checks,
            "overall_status": "healthy" if all(
                check.get("status") == "healthy" 
                for check in custom_checks.values()
            ) else "unhealthy"
        }
        
        status_code = 200 if health_data["overall_status"] == "healthy" else 503
        return Response(health_data, status=status_code)
    
    async def check_database(self) -> dict:
        try:
            # Database health check logic
            return {"status": "healthy", "response_time": "5ms"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
    
    async def check_redis(self) -> dict:
        try:
            # Redis health check logic
            return {"status": "healthy", "response_time": "2ms"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
    
    async def check_external_api(self) -> dict:
        try:
            # External API health check logic
            return {"status": "healthy", "response_time": "150ms"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}

Network Utilities

Utility functions for network operations and host information.

def get_host_ip() -> str: ...
def get_host_name() -> str: ...
def get_ip(name: str) -> str: ...

Usage Examples:

from minos.networks import get_host_ip, get_host_name, get_ip

# Get current host information
host_name = get_host_name()
host_ip = get_host_ip()

print(f"Host: {host_name}")      # e.g., "myserver"
print(f"IP: {host_ip}")          # e.g., "192.168.1.100"

# Resolve hostname to IP
external_ip = get_ip("google.com")
print(f"Google IP: {external_ip}")  # e.g., "172.217.164.142"

# Use in service configuration
class NetworkAwareService:
    def __init__(self):
        self.host_name = get_host_name()
        self.host_ip = get_host_ip()
    
    @enroute.rest.query("/info", method="GET")
    async def get_service_info(self, request: Request) -> Response:
        return Response({
            "service_name": "my-service",
            "host_name": self.host_name,
            "host_ip": self.host_ip,
            "version": "1.0.0"
        })

Queue Utilities

Async queue consumption utilities for message processing.

async def consume_queue(queue, max_count: int) -> None: ...

Usage Examples:

from minos.networks import consume_queue
import asyncio

# Create an asyncio queue
message_queue = asyncio.Queue()

# Add some messages
for i in range(10):
    await message_queue.put(f"message_{i}")

# Consume up to 5 messages at once
await consume_queue(message_queue, max_count=5)

# Queue now has 5 messages remaining
print(f"Queue size: {message_queue.qsize()}")  # 5

# Consume remaining messages
await consume_queue(message_queue, max_count=10)
print(f"Queue size: {message_queue.qsize()}")  # 0

# Use in message processing
class MessageProcessor:
    def __init__(self):
        self.processing_queue = asyncio.Queue()
        self.batch_size = 10
    
    async def add_message(self, message):
        await self.processing_queue.put(message)
    
    async def process_batch(self):
        """Process messages in batches"""
        if self.processing_queue.qsize() > 0:
            # Consume up to batch_size messages
            await consume_queue(self.processing_queue, self.batch_size)
            print(f"Processed batch of messages")
    
    @enroute.periodic.event("*/30 * * * * *")  # Every 30 seconds
    async def periodic_batch_processing(self, request) -> Response:
        await self.process_batch()
        return Response({"batch_processed": True})

Advanced Usage

Comprehensive Health Monitoring

from minos.networks import SystemService, get_host_ip, get_host_name, enroute
import psutil
import time
from datetime import datetime

class AdvancedHealthService(SystemService):
    def __init__(self):
        self.start_time = time.time()
        self.host_ip = get_host_ip()
        self.host_name = get_host_name()
    
    @enroute.rest.query("/system/health", method="GET")
    async def check_health(self, request: Request) -> Response:
        return Response(await self.get_health_status())
    
    @enroute.rest.query("/system/health/detailed", method="GET")
    async def detailed_health(self, request: Request) -> Response:
        health_data = await self.get_health_status()
        
        # Add detailed system metrics
        health_data.update({
            "system": {
                "cpu_percent": psutil.cpu_percent(interval=1),
                "memory": {
                    "total": psutil.virtual_memory().total,
                    "available": psutil.virtual_memory().available,
                    "percent": psutil.virtual_memory().percent
                },
                "disk": {
                    "total": psutil.disk_usage('/').total,
                    "free": psutil.disk_usage('/').free,
                    "percent": psutil.disk_usage('/').percent
                }
            },
            "uptime": time.time() - self.start_time,
            "connections": len(psutil.net_connections())
        })
        
        return Response(health_data)
    
    async def get_health_status(self) -> dict:
        """Get basic health status"""
        return {
            "status": "healthy",
            "timestamp": datetime.utcnow().isoformat(),
            "host_name": self.host_name,
            "host_ip": self.host_ip,
            "service": "minos-microservice",
            "version": "1.0.0"
        }

# Usage
health_service = AdvancedHealthService()

# Available endpoints:
# GET /system/health          - Basic health check
# GET /system/health/detailed - Detailed system metrics

Network Configuration and Discovery

from minos.networks import get_host_ip, get_host_name, DiscoveryConnector
import socket

class NetworkConfigService:
    def __init__(self):
        self.host_name = get_host_name()
        self.host_ip = get_host_ip()
        self.service_port = self.find_available_port()
    
    def find_available_port(self, start_port: int = 8080) -> int:
        """Find an available port starting from start_port"""
        for port in range(start_port, start_port + 100):
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.bind(('', port))
                    return port
            except OSError:
                continue
        raise RuntimeError("No available ports found")
    
    async def register_with_discovery(self, service_name: str, endpoints: list):
        """Register service with discovery using detected network info"""
        discovery = DiscoveryConnector(
            client=InMemoryDiscoveryClient(host="consul", port=8500),
            name=service_name,
            endpoints=endpoints,
            host=self.host_ip,
            port=self.service_port
        )
        
        await discovery.subscribe()
        print(f"Service registered: {service_name}@{self.host_ip}:{self.service_port}")
        return discovery
    
    @enroute.rest.query("/system/network", method="GET")
    async def get_network_info(self, request: Request) -> Response:
        """Get network configuration information"""
        try:
            # Get all network interfaces
            interfaces = {}
            for interface, addrs in psutil.net_if_addrs().items():
                interfaces[interface] = [
                    {
                        "family": addr.family,
                        "address": addr.address,
                        "netmask": addr.netmask
                    }
                    for addr in addrs
                ]
            
            return Response({
                "host_name": self.host_name,
                "host_ip": self.host_ip,
                "service_port": self.service_port,
                "interfaces": interfaces,
                "network_stats": {
                    "bytes_sent": psutil.net_io_counters().bytes_sent,
                    "bytes_recv": psutil.net_io_counters().bytes_recv,
                    "packets_sent": psutil.net_io_counters().packets_sent,
                    "packets_recv": psutil.net_io_counters().packets_recv
                }
            })
        except Exception as e:
            return Response({"error": str(e)}, status=500)

# Usage
network_service = NetworkConfigService()
discovery = await network_service.register_with_discovery(
    "my-service", 
    [{"path": "/api/v1", "method": "GET"}]
)

Message Queue Management

from minos.networks import consume_queue, enroute
import asyncio
from typing import Dict, List

class QueueManager:
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}
        self.processing_stats = {}
    
    def create_queue(self, name: str, maxsize: int = 0) -> asyncio.Queue:
        """Create a named queue"""
        self.queues[name] = asyncio.Queue(maxsize=maxsize)
        self.processing_stats[name] = {
            "messages_processed": 0,
            "last_processed": None
        }
        return self.queues[name]
    
    async def add_message(self, queue_name: str, message):
        """Add message to a named queue"""
        if queue_name in self.queues:
            await self.queues[queue_name].put(message)
        else:
            raise ValueError(f"Queue {queue_name} does not exist")
    
    async def process_queue_batch(self, queue_name: str, batch_size: int = 10):
        """Process messages from a queue in batches"""
        if queue_name not in self.queues:
            raise ValueError(f"Queue {queue_name} does not exist")
        
        queue = self.queues[queue_name]
        initial_size = queue.qsize()
        
        if initial_size > 0:
            await consume_queue(queue, batch_size)
            processed = min(initial_size, batch_size)
            
            # Update stats
            self.processing_stats[queue_name]["messages_processed"] += processed
            self.processing_stats[queue_name]["last_processed"] = datetime.utcnow()
            
            return processed
        return 0
    
    @enroute.rest.query("/system/queues", method="GET")
    async def get_queue_status(self, request: Request) -> Response:
        """Get status of all managed queues"""
        status = {}
        for name, queue in self.queues.items():
            status[name] = {
                "size": queue.qsize(),
                "maxsize": queue.maxsize,
                "stats": self.processing_stats[name]
            }
        
        return Response({"queues": status})
    
    @enroute.periodic.event("*/10 * * * * *")  # Every 10 seconds
    async def process_all_queues(self, request) -> Response:
        """Periodically process all queues"""
        results = {}
        for queue_name in self.queues:
            processed = await self.process_queue_batch(queue_name, batch_size=5)
            results[queue_name] = processed
        
        return Response({"processed": results})

# Usage
queue_manager = QueueManager()

# Create queues
user_queue = queue_manager.create_queue("user_events", maxsize=100)
order_queue = queue_manager.create_queue("order_events", maxsize=50)

# Add messages
await queue_manager.add_message("user_events", {"event": "user_created", "id": "123"})
await queue_manager.add_message("order_events", {"event": "order_placed", "id": "456"})

# Process queues (happens automatically via periodic task)
# Monitor at: GET /system/queues

Exception Handling and Utilities

from minos.networks import (
    MinosNetworkException, MinosHandlerException, 
    RequestException, ResponseException
)

class ExceptionUtilities:
    @staticmethod
    def handle_network_exception(func):
        """Decorator for handling network exceptions"""
        async def wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except MinosNetworkException as e:
                return Response({"error": f"Network error: {e}"}, status=500)
            except Exception as e:
                return Response({"error": f"Unexpected error: {e}"}, status=500)
        return wrapper
    
    @staticmethod
    def create_error_response(error_type: str, message: str, status: int = 400) -> Response:
        """Create standardized error response"""
        return Response({
            "error": {
                "type": error_type,
                "message": message,
                "timestamp": datetime.utcnow().isoformat()
            }
        }, status=status)

# Usage
@ExceptionUtilities.handle_network_exception
@enroute.rest.command("/users", method="POST")
async def create_user(request: Request) -> Response:
    try:
        user_data = await request.content()
        
        if not user_data.get("email"):
            return ExceptionUtilities.create_error_response(
                "validation_error",
                "Email is required",
                400
            )
        
        # Create user logic that might raise MinosNetworkException
        new_user = create_user_logic(user_data)
        return Response(new_user, status=201)
        
    except RequestException as e:
        return ExceptionUtilities.create_error_response(
            "request_error", 
            str(e),
            e.status
        )

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-networks

docs

brokers.md

decorators.md

discovery.md

http.md

index.md

requests.md

routers.md

scheduling.md

specs.md

system-utils.md

tile.json