or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mddecorators.mddiscovery.mdhttp.mdindex.mdrequests.mdrouters.mdscheduling.mdspecs.mdsystem-utils.md

system-utils.mddocs/

0

# System and Utilities

1

2

System health endpoints, network utilities, and various helper functions for microservice operations.

3

4

## Capabilities

5

6

### System Health Service

7

8

Built-in service providing system health endpoints for monitoring and health checks.

9

10

```python { .api }

11

class SystemService:

12

@enroute.rest.command("/system/health", "GET")

13

async def check_health(self, request: Request) -> Response: ...

14

```

15

16

**Usage Examples:**

17

18

```python

19

from minos.networks import SystemService, enroute

20

21

# SystemService is automatically available

22

# Access health check at: GET /system/health

23

24

# Returns response like:

25

# {

26

# "status": "healthy",

27

# "host_ip": "192.168.1.100",

28

# "timestamp": "2023-09-10T08:30:00Z"

29

# }

30

31

# Custom health check extension

32

class CustomHealthService(SystemService):

33

@enroute.rest.command("/system/health", "GET")

34

async def check_health(self, request: Request) -> Response:

35

# Call parent health check

36

base_response = await super().check_health(request)

37

base_health = await base_response.content()

38

39

# Add custom health checks

40

custom_checks = {

41

"database": await self.check_database(),

42

"redis": await self.check_redis(),

43

"external_api": await self.check_external_api()

44

}

45

46

# Combine results

47

health_data = {

48

**base_health,

49

"checks": custom_checks,

50

"overall_status": "healthy" if all(

51

check.get("status") == "healthy"

52

for check in custom_checks.values()

53

) else "unhealthy"

54

}

55

56

status_code = 200 if health_data["overall_status"] == "healthy" else 503

57

return Response(health_data, status=status_code)

58

59

async def check_database(self) -> dict:

60

try:

61

# Database health check logic

62

return {"status": "healthy", "response_time": "5ms"}

63

except Exception as e:

64

return {"status": "unhealthy", "error": str(e)}

65

66

async def check_redis(self) -> dict:

67

try:

68

# Redis health check logic

69

return {"status": "healthy", "response_time": "2ms"}

70

except Exception as e:

71

return {"status": "unhealthy", "error": str(e)}

72

73

async def check_external_api(self) -> dict:

74

try:

75

# External API health check logic

76

return {"status": "healthy", "response_time": "150ms"}

77

except Exception as e:

78

return {"status": "unhealthy", "error": str(e)}

79

```

80

81

### Network Utilities

82

83

Utility functions for network operations and host information.

84

85

```python { .api }

86

def get_host_ip() -> str: ...

87

def get_host_name() -> str: ...

88

def get_ip(name: str) -> str: ...

89

```

90

91

**Usage Examples:**

92

93

```python

94

from minos.networks import get_host_ip, get_host_name, get_ip

95

96

# Get current host information

97

host_name = get_host_name()

98

host_ip = get_host_ip()

99

100

print(f"Host: {host_name}") # e.g., "myserver"

101

print(f"IP: {host_ip}") # e.g., "192.168.1.100"

102

103

# Resolve hostname to IP

104

external_ip = get_ip("google.com")

105

print(f"Google IP: {external_ip}") # e.g., "172.217.164.142"

106

107

# Use in service configuration

108

class NetworkAwareService:

109

def __init__(self):

110

self.host_name = get_host_name()

111

self.host_ip = get_host_ip()

112

113

@enroute.rest.query("/info", method="GET")

114

async def get_service_info(self, request: Request) -> Response:

115

return Response({

116

"service_name": "my-service",

117

"host_name": self.host_name,

118

"host_ip": self.host_ip,

119

"version": "1.0.0"

120

})

121

```

122

123

### Queue Utilities

124

125

Async queue consumption utilities for message processing.

126

127

```python { .api }

128

async def consume_queue(queue, max_count: int) -> None: ...

129

```

130

131

**Usage Examples:**

132

133

```python

134

from minos.networks import consume_queue

135

import asyncio

136

137

# Create an asyncio queue

138

message_queue = asyncio.Queue()

139

140

# Add some messages

141

for i in range(10):

142

await message_queue.put(f"message_{i}")

143

144

# Consume up to 5 messages at once

145

await consume_queue(message_queue, max_count=5)

146

147

# Queue now has 5 messages remaining

148

print(f"Queue size: {message_queue.qsize()}") # 5

149

150

# Consume remaining messages

151

await consume_queue(message_queue, max_count=10)

152

print(f"Queue size: {message_queue.qsize()}") # 0

153

154

# Use in message processing

155

class MessageProcessor:

156

def __init__(self):

157

self.processing_queue = asyncio.Queue()

158

self.batch_size = 10

159

160

async def add_message(self, message):

161

await self.processing_queue.put(message)

162

163

async def process_batch(self):

164

"""Process messages in batches"""

165

if self.processing_queue.qsize() > 0:

166

# Consume up to batch_size messages

167

await consume_queue(self.processing_queue, self.batch_size)

168

print(f"Processed batch of messages")

169

170

@enroute.periodic.event("*/30 * * * * *") # Every 30 seconds

171

async def periodic_batch_processing(self, request) -> Response:

172

await self.process_batch()

173

return Response({"batch_processed": True})

174

```

175

176

## Advanced Usage

177

178

### Comprehensive Health Monitoring

179

180

```python

181

from minos.networks import SystemService, get_host_ip, get_host_name, enroute

182

import psutil

183

import time

184

from datetime import datetime

185

186

class AdvancedHealthService(SystemService):

187

def __init__(self):

188

self.start_time = time.time()

189

self.host_ip = get_host_ip()

190

self.host_name = get_host_name()

191

192

@enroute.rest.query("/system/health", method="GET")

193

async def check_health(self, request: Request) -> Response:

194

return Response(await self.get_health_status())

195

196

@enroute.rest.query("/system/health/detailed", method="GET")

197

async def detailed_health(self, request: Request) -> Response:

198

health_data = await self.get_health_status()

199

200

# Add detailed system metrics

201

health_data.update({

202

"system": {

203

"cpu_percent": psutil.cpu_percent(interval=1),

204

"memory": {

205

"total": psutil.virtual_memory().total,

206

"available": psutil.virtual_memory().available,

207

"percent": psutil.virtual_memory().percent

208

},

209

"disk": {

210

"total": psutil.disk_usage('/').total,

211

"free": psutil.disk_usage('/').free,

212

"percent": psutil.disk_usage('/').percent

213

}

214

},

215

"uptime": time.time() - self.start_time,

216

"connections": len(psutil.net_connections())

217

})

218

219

return Response(health_data)

220

221

async def get_health_status(self) -> dict:

222

"""Get basic health status"""

223

return {

224

"status": "healthy",

225

"timestamp": datetime.utcnow().isoformat(),

226

"host_name": self.host_name,

227

"host_ip": self.host_ip,

228

"service": "minos-microservice",

229

"version": "1.0.0"

230

}

231

232

# Usage

233

health_service = AdvancedHealthService()

234

235

# Available endpoints:

236

# GET /system/health - Basic health check

237

# GET /system/health/detailed - Detailed system metrics

238

```

239

240

### Network Configuration and Discovery

241

242

```python

243

from minos.networks import get_host_ip, get_host_name, DiscoveryConnector

244

import socket

245

246

class NetworkConfigService:

247

def __init__(self):

248

self.host_name = get_host_name()

249

self.host_ip = get_host_ip()

250

self.service_port = self.find_available_port()

251

252

def find_available_port(self, start_port: int = 8080) -> int:

253

"""Find an available port starting from start_port"""

254

for port in range(start_port, start_port + 100):

255

try:

256

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

257

s.bind(('', port))

258

return port

259

except OSError:

260

continue

261

raise RuntimeError("No available ports found")

262

263

async def register_with_discovery(self, service_name: str, endpoints: list):

264

"""Register service with discovery using detected network info"""

265

discovery = DiscoveryConnector(

266

client=InMemoryDiscoveryClient(host="consul", port=8500),

267

name=service_name,

268

endpoints=endpoints,

269

host=self.host_ip,

270

port=self.service_port

271

)

272

273

await discovery.subscribe()

274

print(f"Service registered: {service_name}@{self.host_ip}:{self.service_port}")

275

return discovery

276

277

@enroute.rest.query("/system/network", method="GET")

278

async def get_network_info(self, request: Request) -> Response:

279

"""Get network configuration information"""

280

try:

281

# Get all network interfaces

282

interfaces = {}

283

for interface, addrs in psutil.net_if_addrs().items():

284

interfaces[interface] = [

285

{

286

"family": addr.family,

287

"address": addr.address,

288

"netmask": addr.netmask

289

}

290

for addr in addrs

291

]

292

293

return Response({

294

"host_name": self.host_name,

295

"host_ip": self.host_ip,

296

"service_port": self.service_port,

297

"interfaces": interfaces,

298

"network_stats": {

299

"bytes_sent": psutil.net_io_counters().bytes_sent,

300

"bytes_recv": psutil.net_io_counters().bytes_recv,

301

"packets_sent": psutil.net_io_counters().packets_sent,

302

"packets_recv": psutil.net_io_counters().packets_recv

303

}

304

})

305

except Exception as e:

306

return Response({"error": str(e)}, status=500)

307

308

# Usage

309

network_service = NetworkConfigService()

310

discovery = await network_service.register_with_discovery(

311

"my-service",

312

[{"path": "/api/v1", "method": "GET"}]

313

)

314

```

315

316

### Message Queue Management

317

318

```python

319

from minos.networks import consume_queue, enroute

320

import asyncio

321

from typing import Dict, List

322

323

class QueueManager:

324

def __init__(self):

325

self.queues: Dict[str, asyncio.Queue] = {}

326

self.processing_stats = {}

327

328

def create_queue(self, name: str, maxsize: int = 0) -> asyncio.Queue:

329

"""Create a named queue"""

330

self.queues[name] = asyncio.Queue(maxsize=maxsize)

331

self.processing_stats[name] = {

332

"messages_processed": 0,

333

"last_processed": None

334

}

335

return self.queues[name]

336

337

async def add_message(self, queue_name: str, message):

338

"""Add message to a named queue"""

339

if queue_name in self.queues:

340

await self.queues[queue_name].put(message)

341

else:

342

raise ValueError(f"Queue {queue_name} does not exist")

343

344

async def process_queue_batch(self, queue_name: str, batch_size: int = 10):

345

"""Process messages from a queue in batches"""

346

if queue_name not in self.queues:

347

raise ValueError(f"Queue {queue_name} does not exist")

348

349

queue = self.queues[queue_name]

350

initial_size = queue.qsize()

351

352

if initial_size > 0:

353

await consume_queue(queue, batch_size)

354

processed = min(initial_size, batch_size)

355

356

# Update stats

357

self.processing_stats[queue_name]["messages_processed"] += processed

358

self.processing_stats[queue_name]["last_processed"] = datetime.utcnow()

359

360

return processed

361

return 0

362

363

@enroute.rest.query("/system/queues", method="GET")

364

async def get_queue_status(self, request: Request) -> Response:

365

"""Get status of all managed queues"""

366

status = {}

367

for name, queue in self.queues.items():

368

status[name] = {

369

"size": queue.qsize(),

370

"maxsize": queue.maxsize,

371

"stats": self.processing_stats[name]

372

}

373

374

return Response({"queues": status})

375

376

@enroute.periodic.event("*/10 * * * * *") # Every 10 seconds

377

async def process_all_queues(self, request) -> Response:

378

"""Periodically process all queues"""

379

results = {}

380

for queue_name in self.queues:

381

processed = await self.process_queue_batch(queue_name, batch_size=5)

382

results[queue_name] = processed

383

384

return Response({"processed": results})

385

386

# Usage

387

queue_manager = QueueManager()

388

389

# Create queues

390

user_queue = queue_manager.create_queue("user_events", maxsize=100)

391

order_queue = queue_manager.create_queue("order_events", maxsize=50)

392

393

# Add messages

394

await queue_manager.add_message("user_events", {"event": "user_created", "id": "123"})

395

await queue_manager.add_message("order_events", {"event": "order_placed", "id": "456"})

396

397

# Process queues (happens automatically via periodic task)

398

# Monitor at: GET /system/queues

399

```

400

401

### Exception Handling and Utilities

402

403

```python

404

from minos.networks import (

405

MinosNetworkException, MinosHandlerException,

406

RequestException, ResponseException

407

)

408

409

class ExceptionUtilities:

410

@staticmethod

411

def handle_network_exception(func):

412

"""Decorator for handling network exceptions"""

413

async def wrapper(*args, **kwargs):

414

try:

415

return await func(*args, **kwargs)

416

except MinosNetworkException as e:

417

return Response({"error": f"Network error: {e}"}, status=500)

418

except Exception as e:

419

return Response({"error": f"Unexpected error: {e}"}, status=500)

420

return wrapper

421

422

@staticmethod

423

def create_error_response(error_type: str, message: str, status: int = 400) -> Response:

424

"""Create standardized error response"""

425

return Response({

426

"error": {

427

"type": error_type,

428

"message": message,

429

"timestamp": datetime.utcnow().isoformat()

430

}

431

}, status=status)

432

433

# Usage

434

@ExceptionUtilities.handle_network_exception

435

@enroute.rest.command("/users", method="POST")

436

async def create_user(request: Request) -> Response:

437

try:

438

user_data = await request.content()

439

440

if not user_data.get("email"):

441

return ExceptionUtilities.create_error_response(

442

"validation_error",

443

"Email is required",

444

400

445

)

446

447

# Create user logic that might raise MinosNetworkException

448

new_user = create_user_logic(user_data)

449

return Response(new_user, status=201)

450

451

except RequestException as e:

452

return ExceptionUtilities.create_error_response(

453

"request_error",

454

str(e),

455

e.status

456

)

457

```