0
# Service Discovery
1
2
Service registration and discovery mechanisms for microservice coordination. Supports multiple discovery backends and automatic service lifecycle management.
3
4
## Capabilities
5
6
### Discovery Client Interface
7
8
Abstract client interface for service discovery operations.
9
10
```python { .api }
11
class DiscoveryClient:
12
def __init__(self, host: str, port: int): ...
13
route: str # http://host:port
14
@classmethod
15
def _from_config(cls, config: Config, **kwargs) -> DiscoveryClient: ...
16
async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict[str, str]], *args, **kwargs) -> None: ... # Abstract
17
async def unsubscribe(self, name: str, *args, **kwargs) -> None: ... # Abstract
18
19
class InMemoryDiscoveryClient(DiscoveryClient):
20
is_subscribed: bool
21
async def subscribe(self, *args, **kwargs) -> None: ...
22
async def unsubscribe(self, *args, **kwargs) -> None: ...
23
```
24
25
**Usage Examples:**
26
27
```python
28
from minos.networks import InMemoryDiscoveryClient
29
30
# Create discovery client
31
client = InMemoryDiscoveryClient(host="localhost", port=8500)
32
33
# Register service
34
await client.subscribe(
35
host="localhost",
36
port=8080,
37
name="user-service",
38
endpoints=[
39
{"path": "/users", "method": "GET"},
40
{"path": "/users", "method": "POST"}
41
]
42
)
43
44
print(f"Service registered: {client.is_subscribed}")
45
46
# Unregister service
47
await client.unsubscribe("user-service")
48
```
49
50
### Discovery Connector
51
52
Manages service registration with discovery service and handles lifecycle.
53
54
```python { .api }
55
class DiscoveryConnector:
56
def __init__(self, client: DiscoveryClient, name: str, endpoints: list[dict[str, Any]], host: str, port: Optional[int] = None): ...
57
@classmethod
58
def _from_config(cls, config: Config, **kwargs) -> DiscoveryConnector: ...
59
@classmethod
60
def _client_from_config(cls, config: Config) -> DiscoveryClient: ...
61
@classmethod
62
def _client_cls_from_config(cls, discovery_config: dict[str, Any]) -> type[DiscoveryClient]: ...
63
@classmethod
64
def _port_from_config(cls, config: Config) -> Optional[int]: ...
65
@classmethod
66
def _endpoints_from_config(cls, config: Config) -> list[dict[str, Any]]: ...
67
async def subscribe(self) -> None: ...
68
async def unsubscribe(self) -> None: ...
69
```
70
71
**Usage Examples:**
72
73
```python
74
from minos.networks import DiscoveryConnector, InMemoryDiscoveryClient
75
from minos.common import Config
76
77
# Create client
78
client = InMemoryDiscoveryClient(host="consul", port=8500)
79
80
# Create connector
81
connector = DiscoveryConnector(
82
client=client,
83
name="payment-service",
84
endpoints=[
85
{"path": "/payments", "method": "POST"},
86
{"path": "/payments/{id}", "method": "GET"}
87
],
88
host="localhost",
89
port=8080
90
)
91
92
# Register service
93
await connector.subscribe()
94
95
# Service is now discoverable
96
# Unregister when shutting down
97
await connector.unsubscribe()
98
99
# Using configuration
100
config = Config("config.yml")
101
connector = DiscoveryConnector._from_config(config)
102
```
103
104
## Advanced Usage
105
106
### Complete Service Discovery Setup
107
108
```python
109
from minos.networks import DiscoveryConnector, HttpPort, enroute
110
from minos.common import Config
111
112
class PaymentService:
113
@enroute.rest.command("/payments", method="POST")
114
async def create_payment(self, request) -> Response:
115
return Response({"payment_id": "123", "status": "created"})
116
117
@enroute.rest.query("/payments/{payment_id}", method="GET")
118
async def get_payment(self, request) -> Response:
119
return Response({"payment_id": "123", "amount": 100.0})
120
121
@enroute.broker.event("payment.processed")
122
async def handle_payment_processed(self, request) -> Response:
123
return Response({"processed": True})
124
125
# Configuration-based setup
126
config = Config({
127
"service": {
128
"name": "payment-service",
129
"host": "0.0.0.0",
130
"port": 8080
131
},
132
"discovery": {
133
"client": "InMemoryDiscoveryClient",
134
"host": "consul",
135
"port": 8500
136
}
137
})
138
139
# Create HTTP service
140
http_port = HttpPort._from_config(config)
141
142
# Create discovery connector
143
discovery = DiscoveryConnector._from_config(config)
144
145
# Start services
146
await http_port.start()
147
await discovery.subscribe()
148
149
print(f"Payment service running on {config.service.host}:{config.service.port}")
150
print("Service registered with discovery")
151
152
# Service runs and handles requests...
153
154
# Shutdown
155
await discovery.unsubscribe()
156
await http_port.stop()
157
```
158
159
### Custom Discovery Client Implementation
160
161
```python
162
import aiohttp
163
from minos.networks import DiscoveryClient
164
165
class ConsulDiscoveryClient(DiscoveryClient):
166
"""Consul-based discovery client implementation"""
167
168
async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict], *args, **kwargs) -> None:
169
service_definition = {
170
"ID": f"{name}-{host}-{port}",
171
"Name": name,
172
"Address": host,
173
"Port": port,
174
"Tags": [endpoint["path"] for endpoint in endpoints],
175
"Check": {
176
"HTTP": f"http://{host}:{port}/health",
177
"Interval": "10s"
178
}
179
}
180
181
async with aiohttp.ClientSession() as session:
182
async with session.put(
183
f"{self.route}/v1/agent/service/register",
184
json=service_definition
185
) as response:
186
if response.status != 200:
187
raise Exception(f"Failed to register service: {response.status}")
188
189
async def unsubscribe(self, name: str, *args, **kwargs) -> None:
190
service_id = f"{name}-{self.host}-{self.port}"
191
192
async with aiohttp.ClientSession() as session:
193
async with session.put(
194
f"{self.route}/v1/agent/service/deregister/{service_id}"
195
) as response:
196
if response.status != 200:
197
raise Exception(f"Failed to unregister service: {response.status}")
198
199
# Usage
200
consul_client = ConsulDiscoveryClient(host="consul", port=8500)
201
connector = DiscoveryConnector(
202
client=consul_client,
203
name="user-service",
204
endpoints=[{"path": "/users", "method": "GET"}],
205
host="localhost",
206
port=8080
207
)
208
```
209
210
### Service Health Checks
211
212
```python
213
from minos.networks import enroute, SystemService
214
215
class HealthCheckService:
216
def __init__(self, dependencies: list[str]):
217
self.dependencies = dependencies
218
219
@enroute.rest.query("/health", method="GET")
220
async def health_check(self, request) -> Response:
221
health_status = {
222
"status": "healthy",
223
"timestamp": datetime.utcnow().isoformat(),
224
"dependencies": {}
225
}
226
227
# Check each dependency
228
for dependency in self.dependencies:
229
try:
230
status = await self.check_dependency(dependency)
231
health_status["dependencies"][dependency] = status
232
except Exception as e:
233
health_status["dependencies"][dependency] = {
234
"status": "unhealthy",
235
"error": str(e)
236
}
237
health_status["status"] = "degraded"
238
239
# Overall health based on dependencies
240
if any(dep["status"] == "unhealthy" for dep in health_status["dependencies"].values()):
241
health_status["status"] = "unhealthy"
242
243
status_code = 200 if health_status["status"] in ["healthy", "degraded"] else 503
244
return Response(health_status, status=status_code)
245
246
async def check_dependency(self, dependency: str) -> dict:
247
# Implement dependency-specific health checks
248
if dependency == "database":
249
# Check database connection
250
return {"status": "healthy", "response_time": "5ms"}
251
elif dependency == "redis":
252
# Check Redis connection
253
return {"status": "healthy", "response_time": "2ms"}
254
else:
255
return {"status": "unknown"}
256
257
# Discovery with health checks
258
discovery_connector = DiscoveryConnector(
259
client=consul_client,
260
name="user-service",
261
endpoints=[
262
{"path": "/users", "method": "GET"},
263
{"path": "/health", "method": "GET"} # Health check endpoint
264
],
265
host="localhost",
266
port=8080
267
)
268
```
269
270
### Dynamic Service Configuration
271
272
```python
273
class DynamicDiscoveryService:
274
def __init__(self, config: Config):
275
self.config = config
276
self.connector = None
277
self.registered_endpoints = []
278
279
async def register_service(self, service_name: str, endpoints: list[dict]):
280
"""Dynamically register a service with discovery"""
281
if self.connector:
282
await self.connector.unsubscribe()
283
284
self.connector = DiscoveryConnector(
285
client=self._create_client(),
286
name=service_name,
287
endpoints=endpoints,
288
host=self.config.service.host,
289
port=self.config.service.port
290
)
291
292
await self.connector.subscribe()
293
self.registered_endpoints = endpoints
294
print(f"Service {service_name} registered with {len(endpoints)} endpoints")
295
296
async def add_endpoint(self, endpoint: dict):
297
"""Add a new endpoint to the registered service"""
298
self.registered_endpoints.append(endpoint)
299
300
# Re-register with updated endpoints
301
if self.connector:
302
service_name = self.connector.name
303
await self.register_service(service_name, self.registered_endpoints)
304
305
async def remove_endpoint(self, path: str, method: str):
306
"""Remove an endpoint from the registered service"""
307
self.registered_endpoints = [
308
ep for ep in self.registered_endpoints
309
if not (ep["path"] == path and ep["method"] == method)
310
]
311
312
# Re-register with updated endpoints
313
if self.connector:
314
service_name = self.connector.name
315
await self.register_service(service_name, self.registered_endpoints)
316
317
async def unregister_service(self):
318
"""Unregister the service"""
319
if self.connector:
320
await self.connector.unsubscribe()
321
self.connector = None
322
self.registered_endpoints = []
323
324
def _create_client(self) -> DiscoveryClient:
325
return DiscoveryClient._from_config(self.config)
326
327
# Usage
328
dynamic_discovery = DynamicDiscoveryService(config)
329
330
# Initial registration
331
await dynamic_discovery.register_service("api-service", [
332
{"path": "/api/v1/users", "method": "GET"}
333
])
334
335
# Add endpoints dynamically
336
await dynamic_discovery.add_endpoint({"path": "/api/v1/users", "method": "POST"})
337
await dynamic_discovery.add_endpoint({"path": "/api/v1/posts", "method": "GET"})
338
339
# Remove endpoints dynamically
340
await dynamic_discovery.remove_endpoint("/api/v1/posts", "GET")
341
342
# Cleanup
343
await dynamic_discovery.unregister_service()
344
```