or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mddecorators.mddiscovery.mdhttp.mdindex.mdrequests.mdrouters.mdscheduling.mdspecs.mdsystem-utils.md

discovery.mddocs/

0

# Service Discovery

1

2

Service registration and discovery mechanisms for microservice coordination. Supports multiple discovery backends and automatic service lifecycle management.

3

4

## Capabilities

5

6

### Discovery Client Interface

7

8

Abstract client interface for service discovery operations.

9

10

```python { .api }

11

class DiscoveryClient:

12

def __init__(self, host: str, port: int): ...

13

route: str # http://host:port

14

@classmethod

15

def _from_config(cls, config: Config, **kwargs) -> DiscoveryClient: ...

16

async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict[str, str]], *args, **kwargs) -> None: ... # Abstract

17

async def unsubscribe(self, name: str, *args, **kwargs) -> None: ... # Abstract

18

19

class InMemoryDiscoveryClient(DiscoveryClient):

20

is_subscribed: bool

21

async def subscribe(self, *args, **kwargs) -> None: ...

22

async def unsubscribe(self, *args, **kwargs) -> None: ...

23

```

24

25

**Usage Examples:**

26

27

```python

28

from minos.networks import InMemoryDiscoveryClient

29

30

# Create discovery client

31

client = InMemoryDiscoveryClient(host="localhost", port=8500)

32

33

# Register service

34

await client.subscribe(

35

host="localhost",

36

port=8080,

37

name="user-service",

38

endpoints=[

39

{"path": "/users", "method": "GET"},

40

{"path": "/users", "method": "POST"}

41

]

42

)

43

44

print(f"Service registered: {client.is_subscribed}")

45

46

# Unregister service

47

await client.unsubscribe("user-service")

48

```

49

50

### Discovery Connector

51

52

Manages service registration with discovery service and handles lifecycle.

53

54

```python { .api }

55

class DiscoveryConnector:

56

def __init__(self, client: DiscoveryClient, name: str, endpoints: list[dict[str, Any]], host: str, port: Optional[int] = None): ...

57

@classmethod

58

def _from_config(cls, config: Config, **kwargs) -> DiscoveryConnector: ...

59

@classmethod

60

def _client_from_config(cls, config: Config) -> DiscoveryClient: ...

61

@classmethod

62

def _client_cls_from_config(cls, discovery_config: dict[str, Any]) -> type[DiscoveryClient]: ...

63

@classmethod

64

def _port_from_config(cls, config: Config) -> Optional[int]: ...

65

@classmethod

66

def _endpoints_from_config(cls, config: Config) -> list[dict[str, Any]]: ...

67

async def subscribe(self) -> None: ...

68

async def unsubscribe(self) -> None: ...

69

```

70

71

**Usage Examples:**

72

73

```python

74

from minos.networks import DiscoveryConnector, InMemoryDiscoveryClient

75

from minos.common import Config

76

77

# Create client

78

client = InMemoryDiscoveryClient(host="consul", port=8500)

79

80

# Create connector

81

connector = DiscoveryConnector(

82

client=client,

83

name="payment-service",

84

endpoints=[

85

{"path": "/payments", "method": "POST"},

86

{"path": "/payments/{id}", "method": "GET"}

87

],

88

host="localhost",

89

port=8080

90

)

91

92

# Register service

93

await connector.subscribe()

94

95

# Service is now discoverable

96

# Unregister when shutting down

97

await connector.unsubscribe()

98

99

# Using configuration

100

config = Config("config.yml")

101

connector = DiscoveryConnector._from_config(config)

102

```

103

104

## Advanced Usage

105

106

### Complete Service Discovery Setup

107

108

```python

109

from minos.networks import DiscoveryConnector, HttpPort, enroute

110

from minos.common import Config

111

112

class PaymentService:

113

@enroute.rest.command("/payments", method="POST")

114

async def create_payment(self, request) -> Response:

115

return Response({"payment_id": "123", "status": "created"})

116

117

@enroute.rest.query("/payments/{payment_id}", method="GET")

118

async def get_payment(self, request) -> Response:

119

return Response({"payment_id": "123", "amount": 100.0})

120

121

@enroute.broker.event("payment.processed")

122

async def handle_payment_processed(self, request) -> Response:

123

return Response({"processed": True})

124

125

# Configuration-based setup

126

config = Config({

127

"service": {

128

"name": "payment-service",

129

"host": "0.0.0.0",

130

"port": 8080

131

},

132

"discovery": {

133

"client": "InMemoryDiscoveryClient",

134

"host": "consul",

135

"port": 8500

136

}

137

})

138

139

# Create HTTP service

140

http_port = HttpPort._from_config(config)

141

142

# Create discovery connector

143

discovery = DiscoveryConnector._from_config(config)

144

145

# Start services

146

await http_port.start()

147

await discovery.subscribe()

148

149

print(f"Payment service running on {config.service.host}:{config.service.port}")

150

print("Service registered with discovery")

151

152

# Service runs and handles requests...

153

154

# Shutdown

155

await discovery.unsubscribe()

156

await http_port.stop()

157

```

158

159

### Custom Discovery Client Implementation

160

161

```python

162

import aiohttp

163

from minos.networks import DiscoveryClient

164

165

class ConsulDiscoveryClient(DiscoveryClient):

166

"""Consul-based discovery client implementation"""

167

168

async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict], *args, **kwargs) -> None:

169

service_definition = {

170

"ID": f"{name}-{host}-{port}",

171

"Name": name,

172

"Address": host,

173

"Port": port,

174

"Tags": [endpoint["path"] for endpoint in endpoints],

175

"Check": {

176

"HTTP": f"http://{host}:{port}/health",

177

"Interval": "10s"

178

}

179

}

180

181

async with aiohttp.ClientSession() as session:

182

async with session.put(

183

f"{self.route}/v1/agent/service/register",

184

json=service_definition

185

) as response:

186

if response.status != 200:

187

raise Exception(f"Failed to register service: {response.status}")

188

189

async def unsubscribe(self, name: str, *args, **kwargs) -> None:

190

service_id = f"{name}-{self.host}-{self.port}"

191

192

async with aiohttp.ClientSession() as session:

193

async with session.put(

194

f"{self.route}/v1/agent/service/deregister/{service_id}"

195

) as response:

196

if response.status != 200:

197

raise Exception(f"Failed to unregister service: {response.status}")

198

199

# Usage

200

consul_client = ConsulDiscoveryClient(host="consul", port=8500)

201

connector = DiscoveryConnector(

202

client=consul_client,

203

name="user-service",

204

endpoints=[{"path": "/users", "method": "GET"}],

205

host="localhost",

206

port=8080

207

)

208

```

209

210

### Service Health Checks

211

212

```python

213

from minos.networks import enroute, SystemService

214

215

class HealthCheckService:

216

def __init__(self, dependencies: list[str]):

217

self.dependencies = dependencies

218

219

@enroute.rest.query("/health", method="GET")

220

async def health_check(self, request) -> Response:

221

health_status = {

222

"status": "healthy",

223

"timestamp": datetime.utcnow().isoformat(),

224

"dependencies": {}

225

}

226

227

# Check each dependency

228

for dependency in self.dependencies:

229

try:

230

status = await self.check_dependency(dependency)

231

health_status["dependencies"][dependency] = status

232

except Exception as e:

233

health_status["dependencies"][dependency] = {

234

"status": "unhealthy",

235

"error": str(e)

236

}

237

health_status["status"] = "degraded"

238

239

# Overall health based on dependencies

240

if any(dep["status"] == "unhealthy" for dep in health_status["dependencies"].values()):

241

health_status["status"] = "unhealthy"

242

243

status_code = 200 if health_status["status"] in ["healthy", "degraded"] else 503

244

return Response(health_status, status=status_code)

245

246

async def check_dependency(self, dependency: str) -> dict:

247

# Implement dependency-specific health checks

248

if dependency == "database":

249

# Check database connection

250

return {"status": "healthy", "response_time": "5ms"}

251

elif dependency == "redis":

252

# Check Redis connection

253

return {"status": "healthy", "response_time": "2ms"}

254

else:

255

return {"status": "unknown"}

256

257

# Discovery with health checks

258

discovery_connector = DiscoveryConnector(

259

client=consul_client,

260

name="user-service",

261

endpoints=[

262

{"path": "/users", "method": "GET"},

263

{"path": "/health", "method": "GET"} # Health check endpoint

264

],

265

host="localhost",

266

port=8080

267

)

268

```

269

270

### Dynamic Service Configuration

271

272

```python

273

class DynamicDiscoveryService:

274

def __init__(self, config: Config):

275

self.config = config

276

self.connector = None

277

self.registered_endpoints = []

278

279

async def register_service(self, service_name: str, endpoints: list[dict]):

280

"""Dynamically register a service with discovery"""

281

if self.connector:

282

await self.connector.unsubscribe()

283

284

self.connector = DiscoveryConnector(

285

client=self._create_client(),

286

name=service_name,

287

endpoints=endpoints,

288

host=self.config.service.host,

289

port=self.config.service.port

290

)

291

292

await self.connector.subscribe()

293

self.registered_endpoints = endpoints

294

print(f"Service {service_name} registered with {len(endpoints)} endpoints")

295

296

async def add_endpoint(self, endpoint: dict):

297

"""Add a new endpoint to the registered service"""

298

self.registered_endpoints.append(endpoint)

299

300

# Re-register with updated endpoints

301

if self.connector:

302

service_name = self.connector.name

303

await self.register_service(service_name, self.registered_endpoints)

304

305

async def remove_endpoint(self, path: str, method: str):

306

"""Remove an endpoint from the registered service"""

307

self.registered_endpoints = [

308

ep for ep in self.registered_endpoints

309

if not (ep["path"] == path and ep["method"] == method)

310

]

311

312

# Re-register with updated endpoints

313

if self.connector:

314

service_name = self.connector.name

315

await self.register_service(service_name, self.registered_endpoints)

316

317

async def unregister_service(self):

318

"""Unregister the service"""

319

if self.connector:

320

await self.connector.unsubscribe()

321

self.connector = None

322

self.registered_endpoints = []

323

324

def _create_client(self) -> DiscoveryClient:

325

return DiscoveryClient._from_config(self.config)

326

327

# Usage

328

dynamic_discovery = DynamicDiscoveryService(config)

329

330

# Initial registration

331

await dynamic_discovery.register_service("api-service", [

332

{"path": "/api/v1/users", "method": "GET"}

333

])

334

335

# Add endpoints dynamically

336

await dynamic_discovery.add_endpoint({"path": "/api/v1/users", "method": "POST"})

337

await dynamic_discovery.add_endpoint({"path": "/api/v1/posts", "method": "GET"})

338

339

# Remove endpoints dynamically

340

await dynamic_discovery.remove_endpoint("/api/v1/posts", "GET")

341

342

# Cleanup

343

await dynamic_discovery.unregister_service()

344

```