0
# Microservices Framework
1
2
Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.
3
4
## Capabilities
5
6
### Service Creation and Management
7
8
Create and manage microservices with automatic discovery and monitoring.
9
10
```python { .api }
11
async def add_service(
12
nc: NATS,
13
config: ServiceConfig = None,
14
**kwargs
15
) -> Service:
16
"""
17
Create and start a microservice.
18
19
Parameters:
20
- nc: NATS connection
21
- config: Complete service configuration
22
- **kwargs: Individual configuration parameters
23
24
Returns:
25
Running service instance
26
"""
27
28
class Service:
29
async def start(self) -> None:
30
"""Start the service and begin handling requests."""
31
32
async def stop(self) -> None:
33
"""Stop the service gracefully."""
34
35
async def add_endpoint(self, config: EndpointConfig) -> Endpoint:
36
"""
37
Add new endpoint to service.
38
39
Parameters:
40
- config: Endpoint configuration
41
42
Returns:
43
Endpoint instance
44
"""
45
46
async def reset(self) -> None:
47
"""Reset service statistics."""
48
```
49
50
#### Usage Examples
51
52
```python
53
import asyncio
54
import nats
55
from nats.micro import add_service, ServiceConfig, EndpointConfig
56
57
async def main():
58
nc = await nats.connect()
59
60
# Create service configuration
61
config = ServiceConfig(
62
name="user-service",
63
version="1.2.0",
64
description="User management service",
65
metadata={
66
"team": "backend",
67
"repository": "https://github.com/company/user-service"
68
}
69
)
70
71
# Create and start service
72
service = await add_service(nc, config=config)
73
await service.start()
74
75
# Add endpoints dynamically
76
await service.add_endpoint(EndpointConfig(
77
name="create-user",
78
subject="users.create",
79
handler=create_user_handler
80
))
81
82
print(f"Service {config.name} started")
83
84
# Keep service running
85
try:
86
await asyncio.sleep(3600) # Run for 1 hour
87
finally:
88
await service.stop()
89
```
90
91
### Request Handling
92
93
Handle incoming service requests with automatic routing and error handling.
94
95
```python { .api }
96
class Request:
97
"""Service request wrapper."""
98
subject: str
99
headers: Dict[str, str]
100
data: bytes
101
102
async def respond(
103
self,
104
data: bytes = b"",
105
headers: Dict[str, str] = None
106
) -> None:
107
"""
108
Send successful response.
109
110
Parameters:
111
- data: Response data
112
- headers: Response headers
113
"""
114
115
async def respond_error(
116
self,
117
code: str,
118
description: str,
119
data: bytes = b"",
120
headers: Dict[str, str] = None
121
) -> None:
122
"""
123
Send error response.
124
125
Parameters:
126
- code: Error code
127
- description: Error description
128
- data: Error details
129
- headers: Response headers
130
"""
131
132
# Handler type
133
Handler = Callable[[Request], Awaitable[None]]
134
135
class ServiceError(Exception):
136
"""Service error with code and description."""
137
def __init__(self, code: str, description: str):
138
self.code = code
139
self.description = description
140
super().__init__(f"{code}: {description}")
141
```
142
143
#### Usage Examples
144
145
```python
146
import json
147
from nats.micro import Request, ServiceError
148
149
async def create_user_handler(request: Request):
150
try:
151
# Parse request data
152
user_data = json.loads(request.data.decode())
153
154
# Validate input
155
if not user_data.get("email"):
156
await request.respond_error(
157
code="INVALID_INPUT",
158
description="Email is required",
159
data=b'{"field": "email"}'
160
)
161
return
162
163
# Create user (business logic)
164
user = await create_user(user_data)
165
166
# Send success response
167
response_data = json.dumps({
168
"id": user.id,
169
"email": user.email,
170
"created_at": user.created_at.isoformat()
171
}).encode()
172
173
await request.respond(
174
data=response_data,
175
headers={"Content-Type": "application/json"}
176
)
177
178
except ValidationError as e:
179
await request.respond_error(
180
code="VALIDATION_ERROR",
181
description=str(e)
182
)
183
except Exception as e:
184
await request.respond_error(
185
code="INTERNAL_ERROR",
186
description="Internal server error"
187
)
188
189
async def get_user_handler(request: Request):
190
# Extract user ID from subject or headers
191
user_id = request.headers.get("User-ID")
192
if not user_id:
193
await request.respond_error(
194
code="MISSING_USER_ID",
195
description="User-ID header required"
196
)
197
return
198
199
try:
200
user = await get_user_by_id(user_id)
201
response_data = json.dumps(user.to_dict()).encode()
202
await request.respond(data=response_data)
203
except UserNotFound:
204
await request.respond_error(
205
code="USER_NOT_FOUND",
206
description=f"User {user_id} not found"
207
)
208
```
209
210
### Endpoint Configuration
211
212
Configure service endpoints with subjects, handlers, and metadata.
213
214
```python { .api }
215
@dataclass
216
class EndpointConfig:
217
"""Endpoint configuration."""
218
name: str
219
subject: str
220
handler: Handler
221
metadata: Dict[str, str] = None
222
queue_group: str = "q"
223
224
@dataclass
225
class GroupConfig:
226
"""Endpoint group configuration."""
227
name: str
228
queue_group: str = "q"
229
230
class Group:
231
"""Endpoint group for organizing related endpoints."""
232
async def add_endpoint(self, config: EndpointConfig) -> Endpoint:
233
"""Add endpoint to group."""
234
235
class Endpoint:
236
"""Individual service endpoint."""
237
pass
238
```
239
240
#### Usage Examples
241
242
```python
243
# Create endpoints with different patterns
244
endpoints = [
245
EndpointConfig(
246
name="create-user",
247
subject="users.create",
248
handler=create_user_handler,
249
metadata={"description": "Create new user account"}
250
),
251
EndpointConfig(
252
name="get-user",
253
subject="users.get",
254
handler=get_user_handler,
255
metadata={"description": "Retrieve user by ID"}
256
),
257
EndpointConfig(
258
name="update-user",
259
subject="users.update",
260
handler=update_user_handler,
261
metadata={"description": "Update user information"}
262
),
263
EndpointConfig(
264
name="delete-user",
265
subject="users.delete",
266
handler=delete_user_handler,
267
metadata={"description": "Delete user account"}
268
)
269
]
270
271
# Add all endpoints to service
272
for endpoint_config in endpoints:
273
await service.add_endpoint(endpoint_config)
274
275
# Group related endpoints
276
group_config = GroupConfig(
277
name="user-management",
278
queue_group="user-workers"
279
)
280
281
group = await service.add_group(group_config)
282
await group.add_endpoint(EndpointConfig(
283
name="bulk-import",
284
subject="users.bulk.import",
285
handler=bulk_import_handler
286
))
287
```
288
289
### Service Discovery and Monitoring
290
291
Built-in service discovery with health checks and statistics.
292
293
```python { .api }
294
@dataclass
295
class ServiceInfo:
296
"""Service information for discovery."""
297
name: str
298
id: str
299
version: str
300
description: str
301
endpoints: List[EndpointInfo]
302
metadata: Dict[str, str]
303
304
@dataclass
305
class ServiceStats:
306
"""Service statistics."""
307
name: str
308
id: str
309
started: datetime
310
endpoints: List[EndpointStats]
311
312
@dataclass
313
class ServicePing:
314
"""Service ping response."""
315
name: str
316
id: str
317
version: str
318
metadata: Dict[str, str]
319
320
@dataclass
321
class EndpointInfo:
322
"""Endpoint information."""
323
name: str
324
subject: str
325
metadata: Dict[str, str]
326
327
@dataclass
328
class EndpointStats:
329
"""Endpoint statistics."""
330
name: str
331
subject: str
332
num_requests: int
333
num_errors: int
334
last_error: str
335
processing_time: float
336
average_processing_time: float
337
```
338
339
#### Usage Examples
340
341
```python
342
# Service automatically responds to discovery requests
343
# These are handled internally by the framework
344
345
# Get service information (as a client)
346
service_info = await nc.request("$SRV.INFO", timeout=5.0)
347
info = json.loads(service_info.data.decode())
348
print(f"Available services: {[s['name'] for s in info]}")
349
350
# Ping specific service
351
ping_response = await nc.request("$SRV.PING.user-service", timeout=2.0)
352
ping = json.loads(ping_response.data.decode())
353
print(f"Service {ping['name']} version {ping['version']} is alive")
354
355
# Get service statistics
356
stats_response = await nc.request("$SRV.STATS.user-service", timeout=2.0)
357
stats = json.loads(stats_response.data.decode())
358
print(f"Total requests: {sum(e['num_requests'] for e in stats['endpoints'])}")
359
```
360
361
### Load Balancing and Scaling
362
363
Automatic load balancing across multiple service instances.
364
365
```python { .api }
366
# Constants for service framework
367
DEFAULT_QUEUE_GROUP = "q"
368
DEFAULT_PREFIX = "$SRV"
369
370
# Service control verbs
371
class ServiceVerb:
372
PING = "PING"
373
STATS = "STATS"
374
INFO = "INFO"
375
```
376
377
#### Usage Examples
378
379
```python
380
# Multiple service instances automatically load balance
381
async def start_service_instance(instance_id: str):
382
config = ServiceConfig(
383
name="user-service",
384
version="1.2.0",
385
metadata={"instance_id": instance_id}
386
)
387
388
service = await add_service(nc, config=config)
389
await service.start()
390
391
# Add same endpoints - they'll automatically load balance
392
await service.add_endpoint(EndpointConfig(
393
name="create-user",
394
subject="users.create",
395
handler=create_user_handler
396
))
397
398
return service
399
400
# Start multiple instances
401
instances = await asyncio.gather(*[
402
start_service_instance(f"instance-{i}")
403
for i in range(3)
404
])
405
406
print("Started 3 service instances with automatic load balancing")
407
```
408
409
## Service Configuration
410
411
```python { .api }
412
@dataclass
413
class ServiceConfig:
414
"""Complete service configuration."""
415
name: str
416
version: str = "0.0.1"
417
description: str = ""
418
metadata: Dict[str, str] = None
419
queue_group: str = DEFAULT_QUEUE_GROUP
420
421
# Control subjects configuration
422
stats_handler: Handler = None
423
info_handler: Handler = None
424
ping_handler: Handler = None
425
```
426
427
## Advanced Features
428
429
### Custom Control Handlers
430
431
Override default service control behavior.
432
433
```python
434
async def custom_stats_handler(request: Request):
435
# Custom statistics response
436
stats = {
437
"service": "user-service",
438
"uptime": time.time() - start_time,
439
"custom_metrics": {
440
"cache_hit_rate": 0.95,
441
"db_connections": 10
442
}
443
}
444
await request.respond(json.dumps(stats).encode())
445
446
# Use custom handler
447
config = ServiceConfig(
448
name="user-service",
449
version="1.2.0",
450
stats_handler=custom_stats_handler
451
)
452
453
service = await add_service(nc, config=config)
454
```
455
456
### Error Handling Patterns
457
458
Consistent error handling across endpoints.
459
460
```python
461
# Common error codes
462
class ErrorCodes:
463
INVALID_INPUT = "INVALID_INPUT"
464
NOT_FOUND = "NOT_FOUND"
465
UNAUTHORIZED = "UNAUTHORIZED"
466
INTERNAL_ERROR = "INTERNAL_ERROR"
467
RATE_LIMITED = "RATE_LIMITED"
468
469
# Error handling middleware
470
async def with_error_handling(handler: Handler) -> Handler:
471
async def wrapper(request: Request):
472
try:
473
await handler(request)
474
except ValidationError as e:
475
await request.respond_error(
476
ErrorCodes.INVALID_INPUT,
477
str(e)
478
)
479
except NotFoundError as e:
480
await request.respond_error(
481
ErrorCodes.NOT_FOUND,
482
str(e)
483
)
484
except Exception as e:
485
logger.error(f"Unexpected error: {e}")
486
await request.respond_error(
487
ErrorCodes.INTERNAL_ERROR,
488
"Internal server error"
489
)
490
return wrapper
491
492
# Apply to endpoints
493
await service.add_endpoint(EndpointConfig(
494
name="create-user",
495
subject="users.create",
496
handler=with_error_handling(create_user_handler)
497
))
498
```
499
500
## Constants
501
502
```python { .api }
503
# Default configuration
504
DEFAULT_QUEUE_GROUP = "q"
505
DEFAULT_PREFIX = "$SRV"
506
507
# Response types
508
INFO_RESPONSE_TYPE = "io.nats.micro.v1.info_response"
509
PING_RESPONSE_TYPE = "io.nats.micro.v1.ping_response"
510
STATS_RESPONSE_TYPE = "io.nats.micro.v1.stats_response"
511
512
# Headers
513
ERROR_HEADER = "Nats-Service-Error"
514
ERROR_CODE_HEADER = "Nats-Service-Error-Code"
515
```