0
# System and Utilities
1
2
System health endpoints, network utilities, and various helper functions for microservice operations.
3
4
## Capabilities
5
6
### System Health Service
7
8
Built-in service providing system health endpoints for monitoring and health checks.
9
10
```python { .api }
11
class SystemService:
12
@enroute.rest.command("/system/health", "GET")
13
async def check_health(self, request: Request) -> Response: ...
14
```
15
16
**Usage Examples:**
17
18
```python
19
from minos.networks import SystemService, enroute
20
21
# SystemService is automatically available
22
# Access health check at: GET /system/health
23
24
# Returns response like:
25
# {
26
# "status": "healthy",
27
# "host_ip": "192.168.1.100",
28
# "timestamp": "2023-09-10T08:30:00Z"
29
# }
30
31
# Custom health check extension
32
class CustomHealthService(SystemService):
33
@enroute.rest.command("/system/health", "GET")
34
async def check_health(self, request: Request) -> Response:
35
# Call parent health check
36
base_response = await super().check_health(request)
37
base_health = await base_response.content()
38
39
# Add custom health checks
40
custom_checks = {
41
"database": await self.check_database(),
42
"redis": await self.check_redis(),
43
"external_api": await self.check_external_api()
44
}
45
46
# Combine results
47
health_data = {
48
**base_health,
49
"checks": custom_checks,
50
"overall_status": "healthy" if all(
51
check.get("status") == "healthy"
52
for check in custom_checks.values()
53
) else "unhealthy"
54
}
55
56
status_code = 200 if health_data["overall_status"] == "healthy" else 503
57
return Response(health_data, status=status_code)
58
59
async def check_database(self) -> dict:
60
try:
61
# Database health check logic
62
return {"status": "healthy", "response_time": "5ms"}
63
except Exception as e:
64
return {"status": "unhealthy", "error": str(e)}
65
66
async def check_redis(self) -> dict:
67
try:
68
# Redis health check logic
69
return {"status": "healthy", "response_time": "2ms"}
70
except Exception as e:
71
return {"status": "unhealthy", "error": str(e)}
72
73
async def check_external_api(self) -> dict:
74
try:
75
# External API health check logic
76
return {"status": "healthy", "response_time": "150ms"}
77
except Exception as e:
78
return {"status": "unhealthy", "error": str(e)}
79
```
80
81
### Network Utilities
82
83
Utility functions for network operations and host information.
84
85
```python { .api }
86
def get_host_ip() -> str: ...
87
def get_host_name() -> str: ...
88
def get_ip(name: str) -> str: ...
89
```
90
91
**Usage Examples:**
92
93
```python
94
from minos.networks import get_host_ip, get_host_name, get_ip
95
96
# Get current host information
97
host_name = get_host_name()
98
host_ip = get_host_ip()
99
100
print(f"Host: {host_name}") # e.g., "myserver"
101
print(f"IP: {host_ip}") # e.g., "192.168.1.100"
102
103
# Resolve hostname to IP
104
external_ip = get_ip("google.com")
105
print(f"Google IP: {external_ip}") # e.g., "172.217.164.142"
106
107
# Use in service configuration
108
class NetworkAwareService:
109
def __init__(self):
110
self.host_name = get_host_name()
111
self.host_ip = get_host_ip()
112
113
@enroute.rest.query("/info", method="GET")
114
async def get_service_info(self, request: Request) -> Response:
115
return Response({
116
"service_name": "my-service",
117
"host_name": self.host_name,
118
"host_ip": self.host_ip,
119
"version": "1.0.0"
120
})
121
```
122
123
### Queue Utilities
124
125
Async queue consumption utilities for message processing.
126
127
```python { .api }
128
async def consume_queue(queue, max_count: int) -> None: ...
129
```
130
131
**Usage Examples:**
132
133
```python
134
from minos.networks import consume_queue
135
import asyncio
136
137
# Create an asyncio queue
138
message_queue = asyncio.Queue()
139
140
# Add some messages
141
for i in range(10):
142
await message_queue.put(f"message_{i}")
143
144
# Consume up to 5 messages at once
145
await consume_queue(message_queue, max_count=5)
146
147
# Queue now has 5 messages remaining
148
print(f"Queue size: {message_queue.qsize()}") # 5
149
150
# Consume remaining messages
151
await consume_queue(message_queue, max_count=10)
152
print(f"Queue size: {message_queue.qsize()}") # 0
153
154
# Use in message processing
155
class MessageProcessor:
156
def __init__(self):
157
self.processing_queue = asyncio.Queue()
158
self.batch_size = 10
159
160
async def add_message(self, message):
161
await self.processing_queue.put(message)
162
163
async def process_batch(self):
164
"""Process messages in batches"""
165
if self.processing_queue.qsize() > 0:
166
# Consume up to batch_size messages
167
await consume_queue(self.processing_queue, self.batch_size)
168
print(f"Processed batch of messages")
169
170
@enroute.periodic.event("*/30 * * * * *") # Every 30 seconds
171
async def periodic_batch_processing(self, request) -> Response:
172
await self.process_batch()
173
return Response({"batch_processed": True})
174
```
175
176
## Advanced Usage
177
178
### Comprehensive Health Monitoring
179
180
```python
181
from minos.networks import SystemService, get_host_ip, get_host_name, enroute
182
import psutil
183
import time
184
from datetime import datetime
185
186
class AdvancedHealthService(SystemService):
187
def __init__(self):
188
self.start_time = time.time()
189
self.host_ip = get_host_ip()
190
self.host_name = get_host_name()
191
192
@enroute.rest.query("/system/health", method="GET")
193
async def check_health(self, request: Request) -> Response:
194
return Response(await self.get_health_status())
195
196
@enroute.rest.query("/system/health/detailed", method="GET")
197
async def detailed_health(self, request: Request) -> Response:
198
health_data = await self.get_health_status()
199
200
# Add detailed system metrics
201
health_data.update({
202
"system": {
203
"cpu_percent": psutil.cpu_percent(interval=1),
204
"memory": {
205
"total": psutil.virtual_memory().total,
206
"available": psutil.virtual_memory().available,
207
"percent": psutil.virtual_memory().percent
208
},
209
"disk": {
210
"total": psutil.disk_usage('/').total,
211
"free": psutil.disk_usage('/').free,
212
"percent": psutil.disk_usage('/').percent
213
}
214
},
215
"uptime": time.time() - self.start_time,
216
"connections": len(psutil.net_connections())
217
})
218
219
return Response(health_data)
220
221
async def get_health_status(self) -> dict:
222
"""Get basic health status"""
223
return {
224
"status": "healthy",
225
"timestamp": datetime.utcnow().isoformat(),
226
"host_name": self.host_name,
227
"host_ip": self.host_ip,
228
"service": "minos-microservice",
229
"version": "1.0.0"
230
}
231
232
# Usage
233
health_service = AdvancedHealthService()
234
235
# Available endpoints:
236
# GET /system/health - Basic health check
237
# GET /system/health/detailed - Detailed system metrics
238
```
239
240
### Network Configuration and Discovery
241
242
```python
243
from minos.networks import get_host_ip, get_host_name, DiscoveryConnector
244
import socket
245
246
class NetworkConfigService:
247
def __init__(self):
248
self.host_name = get_host_name()
249
self.host_ip = get_host_ip()
250
self.service_port = self.find_available_port()
251
252
def find_available_port(self, start_port: int = 8080) -> int:
253
"""Find an available port starting from start_port"""
254
for port in range(start_port, start_port + 100):
255
try:
256
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
257
s.bind(('', port))
258
return port
259
except OSError:
260
continue
261
raise RuntimeError("No available ports found")
262
263
async def register_with_discovery(self, service_name: str, endpoints: list):
264
"""Register service with discovery using detected network info"""
265
discovery = DiscoveryConnector(
266
client=InMemoryDiscoveryClient(host="consul", port=8500),
267
name=service_name,
268
endpoints=endpoints,
269
host=self.host_ip,
270
port=self.service_port
271
)
272
273
await discovery.subscribe()
274
print(f"Service registered: {service_name}@{self.host_ip}:{self.service_port}")
275
return discovery
276
277
@enroute.rest.query("/system/network", method="GET")
278
async def get_network_info(self, request: Request) -> Response:
279
"""Get network configuration information"""
280
try:
281
# Get all network interfaces
282
interfaces = {}
283
for interface, addrs in psutil.net_if_addrs().items():
284
interfaces[interface] = [
285
{
286
"family": addr.family,
287
"address": addr.address,
288
"netmask": addr.netmask
289
}
290
for addr in addrs
291
]
292
293
return Response({
294
"host_name": self.host_name,
295
"host_ip": self.host_ip,
296
"service_port": self.service_port,
297
"interfaces": interfaces,
298
"network_stats": {
299
"bytes_sent": psutil.net_io_counters().bytes_sent,
300
"bytes_recv": psutil.net_io_counters().bytes_recv,
301
"packets_sent": psutil.net_io_counters().packets_sent,
302
"packets_recv": psutil.net_io_counters().packets_recv
303
}
304
})
305
except Exception as e:
306
return Response({"error": str(e)}, status=500)
307
308
# Usage
309
network_service = NetworkConfigService()
310
discovery = await network_service.register_with_discovery(
311
"my-service",
312
[{"path": "/api/v1", "method": "GET"}]
313
)
314
```
315
316
### Message Queue Management
317
318
```python
319
from minos.networks import consume_queue, enroute
320
import asyncio
321
from typing import Dict, List
322
323
class QueueManager:
324
def __init__(self):
325
self.queues: Dict[str, asyncio.Queue] = {}
326
self.processing_stats = {}
327
328
def create_queue(self, name: str, maxsize: int = 0) -> asyncio.Queue:
329
"""Create a named queue"""
330
self.queues[name] = asyncio.Queue(maxsize=maxsize)
331
self.processing_stats[name] = {
332
"messages_processed": 0,
333
"last_processed": None
334
}
335
return self.queues[name]
336
337
async def add_message(self, queue_name: str, message):
338
"""Add message to a named queue"""
339
if queue_name in self.queues:
340
await self.queues[queue_name].put(message)
341
else:
342
raise ValueError(f"Queue {queue_name} does not exist")
343
344
async def process_queue_batch(self, queue_name: str, batch_size: int = 10):
345
"""Process messages from a queue in batches"""
346
if queue_name not in self.queues:
347
raise ValueError(f"Queue {queue_name} does not exist")
348
349
queue = self.queues[queue_name]
350
initial_size = queue.qsize()
351
352
if initial_size > 0:
353
await consume_queue(queue, batch_size)
354
processed = min(initial_size, batch_size)
355
356
# Update stats
357
self.processing_stats[queue_name]["messages_processed"] += processed
358
self.processing_stats[queue_name]["last_processed"] = datetime.utcnow()
359
360
return processed
361
return 0
362
363
@enroute.rest.query("/system/queues", method="GET")
364
async def get_queue_status(self, request: Request) -> Response:
365
"""Get status of all managed queues"""
366
status = {}
367
for name, queue in self.queues.items():
368
status[name] = {
369
"size": queue.qsize(),
370
"maxsize": queue.maxsize,
371
"stats": self.processing_stats[name]
372
}
373
374
return Response({"queues": status})
375
376
@enroute.periodic.event("*/10 * * * * *") # Every 10 seconds
377
async def process_all_queues(self, request) -> Response:
378
"""Periodically process all queues"""
379
results = {}
380
for queue_name in self.queues:
381
processed = await self.process_queue_batch(queue_name, batch_size=5)
382
results[queue_name] = processed
383
384
return Response({"processed": results})
385
386
# Usage
387
queue_manager = QueueManager()
388
389
# Create queues
390
user_queue = queue_manager.create_queue("user_events", maxsize=100)
391
order_queue = queue_manager.create_queue("order_events", maxsize=50)
392
393
# Add messages
394
await queue_manager.add_message("user_events", {"event": "user_created", "id": "123"})
395
await queue_manager.add_message("order_events", {"event": "order_placed", "id": "456"})
396
397
# Process queues (happens automatically via periodic task)
398
# Monitor at: GET /system/queues
399
```
400
401
### Exception Handling and Utilities
402
403
```python
404
from minos.networks import (
405
MinosNetworkException, MinosHandlerException,
406
RequestException, ResponseException
407
)
408
409
class ExceptionUtilities:
410
@staticmethod
411
def handle_network_exception(func):
412
"""Decorator for handling network exceptions"""
413
async def wrapper(*args, **kwargs):
414
try:
415
return await func(*args, **kwargs)
416
except MinosNetworkException as e:
417
return Response({"error": f"Network error: {e}"}, status=500)
418
except Exception as e:
419
return Response({"error": f"Unexpected error: {e}"}, status=500)
420
return wrapper
421
422
@staticmethod
423
def create_error_response(error_type: str, message: str, status: int = 400) -> Response:
424
"""Create standardized error response"""
425
return Response({
426
"error": {
427
"type": error_type,
428
"message": message,
429
"timestamp": datetime.utcnow().isoformat()
430
}
431
}, status=status)
432
433
# Usage
434
@ExceptionUtilities.handle_network_exception
435
@enroute.rest.command("/users", method="POST")
436
async def create_user(request: Request) -> Response:
437
try:
438
user_data = await request.content()
439
440
if not user_data.get("email"):
441
return ExceptionUtilities.create_error_response(
442
"validation_error",
443
"Email is required",
444
400
445
)
446
447
# Create user logic that might raise MinosNetworkException
448
new_user = create_user_logic(user_data)
449
return Response(new_user, status=201)
450
451
except RequestException as e:
452
return ExceptionUtilities.create_error_response(
453
"request_error",
454
str(e),
455
e.status
456
)
457
```