The networks core of the Minos Framework providing networking components for reactive microservices
—
System health endpoints, network utilities, and various helper functions for microservice operations.
Built-in service providing system health endpoints for monitoring and health checks.
class SystemService:
@enroute.rest.command("/system/health", "GET")
async def check_health(self, request: Request) -> Response: ...Usage Examples:
from minos.networks import SystemService, enroute
# SystemService is automatically available
# Access health check at: GET /system/health
# Returns response like:
# {
# "status": "healthy",
# "host_ip": "192.168.1.100",
# "timestamp": "2023-09-10T08:30:00Z"
# }
# Custom health check extension
class CustomHealthService(SystemService):
@enroute.rest.command("/system/health", "GET")
async def check_health(self, request: Request) -> Response:
# Call parent health check
base_response = await super().check_health(request)
base_health = await base_response.content()
# Add custom health checks
custom_checks = {
"database": await self.check_database(),
"redis": await self.check_redis(),
"external_api": await self.check_external_api()
}
# Combine results
health_data = {
**base_health,
"checks": custom_checks,
"overall_status": "healthy" if all(
check.get("status") == "healthy"
for check in custom_checks.values()
) else "unhealthy"
}
status_code = 200 if health_data["overall_status"] == "healthy" else 503
return Response(health_data, status=status_code)
async def check_database(self) -> dict:
try:
# Database health check logic
return {"status": "healthy", "response_time": "5ms"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def check_redis(self) -> dict:
try:
# Redis health check logic
return {"status": "healthy", "response_time": "2ms"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def check_external_api(self) -> dict:
try:
# External API health check logic
return {"status": "healthy", "response_time": "150ms"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}Utility functions for network operations and host information.
def get_host_ip() -> str: ...
def get_host_name() -> str: ...
def get_ip(name: str) -> str: ...Usage Examples:
from minos.networks import get_host_ip, get_host_name, get_ip
# Get current host information
host_name = get_host_name()
host_ip = get_host_ip()
print(f"Host: {host_name}") # e.g., "myserver"
print(f"IP: {host_ip}") # e.g., "192.168.1.100"
# Resolve hostname to IP
external_ip = get_ip("google.com")
print(f"Google IP: {external_ip}") # e.g., "172.217.164.142"
# Use in service configuration
class NetworkAwareService:
def __init__(self):
self.host_name = get_host_name()
self.host_ip = get_host_ip()
@enroute.rest.query("/info", method="GET")
async def get_service_info(self, request: Request) -> Response:
return Response({
"service_name": "my-service",
"host_name": self.host_name,
"host_ip": self.host_ip,
"version": "1.0.0"
})Async queue consumption utilities for message processing.
async def consume_queue(queue, max_count: int) -> None: ...Usage Examples:
from minos.networks import consume_queue
import asyncio
# Create an asyncio queue
message_queue = asyncio.Queue()
# Add some messages
for i in range(10):
await message_queue.put(f"message_{i}")
# Consume up to 5 messages at once
await consume_queue(message_queue, max_count=5)
# Queue now has 5 messages remaining
print(f"Queue size: {message_queue.qsize()}") # 5
# Consume remaining messages
await consume_queue(message_queue, max_count=10)
print(f"Queue size: {message_queue.qsize()}") # 0
# Use in message processing
class MessageProcessor:
def __init__(self):
self.processing_queue = asyncio.Queue()
self.batch_size = 10
async def add_message(self, message):
await self.processing_queue.put(message)
async def process_batch(self):
"""Process messages in batches"""
if self.processing_queue.qsize() > 0:
# Consume up to batch_size messages
await consume_queue(self.processing_queue, self.batch_size)
print(f"Processed batch of messages")
@enroute.periodic.event("*/30 * * * * *") # Every 30 seconds
async def periodic_batch_processing(self, request) -> Response:
await self.process_batch()
return Response({"batch_processed": True})from minos.networks import SystemService, get_host_ip, get_host_name, enroute
import psutil
import time
from datetime import datetime
class AdvancedHealthService(SystemService):
def __init__(self):
self.start_time = time.time()
self.host_ip = get_host_ip()
self.host_name = get_host_name()
@enroute.rest.query("/system/health", method="GET")
async def check_health(self, request: Request) -> Response:
return Response(await self.get_health_status())
@enroute.rest.query("/system/health/detailed", method="GET")
async def detailed_health(self, request: Request) -> Response:
health_data = await self.get_health_status()
# Add detailed system metrics
health_data.update({
"system": {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory": {
"total": psutil.virtual_memory().total,
"available": psutil.virtual_memory().available,
"percent": psutil.virtual_memory().percent
},
"disk": {
"total": psutil.disk_usage('/').total,
"free": psutil.disk_usage('/').free,
"percent": psutil.disk_usage('/').percent
}
},
"uptime": time.time() - self.start_time,
"connections": len(psutil.net_connections())
})
return Response(health_data)
async def get_health_status(self) -> dict:
"""Get basic health status"""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"host_name": self.host_name,
"host_ip": self.host_ip,
"service": "minos-microservice",
"version": "1.0.0"
}
# Usage
health_service = AdvancedHealthService()
# Available endpoints:
# GET /system/health - Basic health check
# GET /system/health/detailed - Detailed system metricsfrom minos.networks import get_host_ip, get_host_name, DiscoveryConnector
import socket
class NetworkConfigService:
def __init__(self):
self.host_name = get_host_name()
self.host_ip = get_host_ip()
self.service_port = self.find_available_port()
def find_available_port(self, start_port: int = 8080) -> int:
"""Find an available port starting from start_port"""
for port in range(start_port, start_port + 100):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', port))
return port
except OSError:
continue
raise RuntimeError("No available ports found")
async def register_with_discovery(self, service_name: str, endpoints: list):
"""Register service with discovery using detected network info"""
discovery = DiscoveryConnector(
client=InMemoryDiscoveryClient(host="consul", port=8500),
name=service_name,
endpoints=endpoints,
host=self.host_ip,
port=self.service_port
)
await discovery.subscribe()
print(f"Service registered: {service_name}@{self.host_ip}:{self.service_port}")
return discovery
@enroute.rest.query("/system/network", method="GET")
async def get_network_info(self, request: Request) -> Response:
"""Get network configuration information"""
try:
# Get all network interfaces
interfaces = {}
for interface, addrs in psutil.net_if_addrs().items():
interfaces[interface] = [
{
"family": addr.family,
"address": addr.address,
"netmask": addr.netmask
}
for addr in addrs
]
return Response({
"host_name": self.host_name,
"host_ip": self.host_ip,
"service_port": self.service_port,
"interfaces": interfaces,
"network_stats": {
"bytes_sent": psutil.net_io_counters().bytes_sent,
"bytes_recv": psutil.net_io_counters().bytes_recv,
"packets_sent": psutil.net_io_counters().packets_sent,
"packets_recv": psutil.net_io_counters().packets_recv
}
})
except Exception as e:
return Response({"error": str(e)}, status=500)
# Usage
network_service = NetworkConfigService()
discovery = await network_service.register_with_discovery(
"my-service",
[{"path": "/api/v1", "method": "GET"}]
)from minos.networks import consume_queue, enroute
import asyncio
from typing import Dict, List
class QueueManager:
def __init__(self):
self.queues: Dict[str, asyncio.Queue] = {}
self.processing_stats = {}
def create_queue(self, name: str, maxsize: int = 0) -> asyncio.Queue:
"""Create a named queue"""
self.queues[name] = asyncio.Queue(maxsize=maxsize)
self.processing_stats[name] = {
"messages_processed": 0,
"last_processed": None
}
return self.queues[name]
async def add_message(self, queue_name: str, message):
"""Add message to a named queue"""
if queue_name in self.queues:
await self.queues[queue_name].put(message)
else:
raise ValueError(f"Queue {queue_name} does not exist")
async def process_queue_batch(self, queue_name: str, batch_size: int = 10):
"""Process messages from a queue in batches"""
if queue_name not in self.queues:
raise ValueError(f"Queue {queue_name} does not exist")
queue = self.queues[queue_name]
initial_size = queue.qsize()
if initial_size > 0:
await consume_queue(queue, batch_size)
processed = min(initial_size, batch_size)
# Update stats
self.processing_stats[queue_name]["messages_processed"] += processed
self.processing_stats[queue_name]["last_processed"] = datetime.utcnow()
return processed
return 0
@enroute.rest.query("/system/queues", method="GET")
async def get_queue_status(self, request: Request) -> Response:
"""Get status of all managed queues"""
status = {}
for name, queue in self.queues.items():
status[name] = {
"size": queue.qsize(),
"maxsize": queue.maxsize,
"stats": self.processing_stats[name]
}
return Response({"queues": status})
@enroute.periodic.event("*/10 * * * * *") # Every 10 seconds
async def process_all_queues(self, request) -> Response:
"""Periodically process all queues"""
results = {}
for queue_name in self.queues:
processed = await self.process_queue_batch(queue_name, batch_size=5)
results[queue_name] = processed
return Response({"processed": results})
# Usage
queue_manager = QueueManager()
# Create queues
user_queue = queue_manager.create_queue("user_events", maxsize=100)
order_queue = queue_manager.create_queue("order_events", maxsize=50)
# Add messages
await queue_manager.add_message("user_events", {"event": "user_created", "id": "123"})
await queue_manager.add_message("order_events", {"event": "order_placed", "id": "456"})
# Process queues (happens automatically via periodic task)
# Monitor at: GET /system/queuesfrom minos.networks import (
MinosNetworkException, MinosHandlerException,
RequestException, ResponseException
)
class ExceptionUtilities:
@staticmethod
def handle_network_exception(func):
"""Decorator for handling network exceptions"""
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except MinosNetworkException as e:
return Response({"error": f"Network error: {e}"}, status=500)
except Exception as e:
return Response({"error": f"Unexpected error: {e}"}, status=500)
return wrapper
@staticmethod
def create_error_response(error_type: str, message: str, status: int = 400) -> Response:
"""Create standardized error response"""
return Response({
"error": {
"type": error_type,
"message": message,
"timestamp": datetime.utcnow().isoformat()
}
}, status=status)
# Usage
@ExceptionUtilities.handle_network_exception
@enroute.rest.command("/users", method="POST")
async def create_user(request: Request) -> Response:
try:
user_data = await request.content()
if not user_data.get("email"):
return ExceptionUtilities.create_error_response(
"validation_error",
"Email is required",
400
)
# Create user logic that might raise MinosNetworkException
new_user = create_user_logic(user_data)
return Response(new_user, status=201)
except RequestException as e:
return ExceptionUtilities.create_error_response(
"request_error",
str(e),
e.status
)Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks