or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

microservices.mddocs/

0

# Microservices Framework

1

2

Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.

3

4

## Capabilities

5

6

### Service Creation and Management

7

8

Create and manage microservices with automatic discovery and monitoring.

9

10

```python { .api }

11

async def add_service(

12

nc: NATS,

13

config: ServiceConfig = None,

14

**kwargs

15

) -> Service:

16

"""

17

Create and start a microservice.

18

19

Parameters:

20

- nc: NATS connection

21

- config: Complete service configuration

22

- **kwargs: Individual configuration parameters

23

24

Returns:

25

Running service instance

26

"""

27

28

class Service:

29

async def start(self) -> None:

30

"""Start the service and begin handling requests."""

31

32

async def stop(self) -> None:

33

"""Stop the service gracefully."""

34

35

async def add_endpoint(self, config: EndpointConfig) -> Endpoint:

36

"""

37

Add new endpoint to service.

38

39

Parameters:

40

- config: Endpoint configuration

41

42

Returns:

43

Endpoint instance

44

"""

45

46

async def reset(self) -> None:

47

"""Reset service statistics."""

48

```

49

50

#### Usage Examples

51

52

```python

53

import asyncio

54

import nats

55

from nats.micro import add_service, ServiceConfig, EndpointConfig

56

57

async def main():

58

nc = await nats.connect()

59

60

# Create service configuration

61

config = ServiceConfig(

62

name="user-service",

63

version="1.2.0",

64

description="User management service",

65

metadata={

66

"team": "backend",

67

"repository": "https://github.com/company/user-service"

68

}

69

)

70

71

# Create and start service

72

service = await add_service(nc, config=config)

73

await service.start()

74

75

# Add endpoints dynamically

76

await service.add_endpoint(EndpointConfig(

77

name="create-user",

78

subject="users.create",

79

handler=create_user_handler

80

))

81

82

print(f"Service {config.name} started")

83

84

# Keep service running

85

try:

86

await asyncio.sleep(3600) # Run for 1 hour

87

finally:

88

await service.stop()

89

```

90

91

### Request Handling

92

93

Handle incoming service requests with automatic routing and error handling.

94

95

```python { .api }

96

class Request:

97

"""Service request wrapper."""

98

subject: str

99

headers: Dict[str, str]

100

data: bytes

101

102

async def respond(

103

self,

104

data: bytes = b"",

105

headers: Dict[str, str] = None

106

) -> None:

107

"""

108

Send successful response.

109

110

Parameters:

111

- data: Response data

112

- headers: Response headers

113

"""

114

115

async def respond_error(

116

self,

117

code: str,

118

description: str,

119

data: bytes = b"",

120

headers: Dict[str, str] = None

121

) -> None:

122

"""

123

Send error response.

124

125

Parameters:

126

- code: Error code

127

- description: Error description

128

- data: Error details

129

- headers: Response headers

130

"""

131

132

# Handler type

133

Handler = Callable[[Request], Awaitable[None]]

134

135

class ServiceError(Exception):

136

"""Service error with code and description."""

137

def __init__(self, code: str, description: str):

138

self.code = code

139

self.description = description

140

super().__init__(f"{code}: {description}")

141

```

142

143

#### Usage Examples

144

145

```python

146

import json

147

from nats.micro import Request, ServiceError

148

149

async def create_user_handler(request: Request):

150

try:

151

# Parse request data

152

user_data = json.loads(request.data.decode())

153

154

# Validate input

155

if not user_data.get("email"):

156

await request.respond_error(

157

code="INVALID_INPUT",

158

description="Email is required",

159

data=b'{"field": "email"}'

160

)

161

return

162

163

# Create user (business logic)

164

user = await create_user(user_data)

165

166

# Send success response

167

response_data = json.dumps({

168

"id": user.id,

169

"email": user.email,

170

"created_at": user.created_at.isoformat()

171

}).encode()

172

173

await request.respond(

174

data=response_data,

175

headers={"Content-Type": "application/json"}

176

)

177

178

except ValidationError as e:

179

await request.respond_error(

180

code="VALIDATION_ERROR",

181

description=str(e)

182

)

183

except Exception as e:

184

await request.respond_error(

185

code="INTERNAL_ERROR",

186

description="Internal server error"

187

)

188

189

async def get_user_handler(request: Request):

190

# Extract user ID from subject or headers

191

user_id = request.headers.get("User-ID")

192

if not user_id:

193

await request.respond_error(

194

code="MISSING_USER_ID",

195

description="User-ID header required"

196

)

197

return

198

199

try:

200

user = await get_user_by_id(user_id)

201

response_data = json.dumps(user.to_dict()).encode()

202

await request.respond(data=response_data)

203

except UserNotFound:

204

await request.respond_error(

205

code="USER_NOT_FOUND",

206

description=f"User {user_id} not found"

207

)

208

```

209

210

### Endpoint Configuration

211

212

Configure service endpoints with subjects, handlers, and metadata.

213

214

```python { .api }

215

@dataclass

216

class EndpointConfig:

217

"""Endpoint configuration."""

218

name: str

219

subject: str

220

handler: Handler

221

metadata: Dict[str, str] = None

222

queue_group: str = "q"

223

224

@dataclass

225

class GroupConfig:

226

"""Endpoint group configuration."""

227

name: str

228

queue_group: str = "q"

229

230

class Group:

231

"""Endpoint group for organizing related endpoints."""

232

async def add_endpoint(self, config: EndpointConfig) -> Endpoint:

233

"""Add endpoint to group."""

234

235

class Endpoint:

236

"""Individual service endpoint."""

237

pass

238

```

239

240

#### Usage Examples

241

242

```python

243

# Create endpoints with different patterns

244

endpoints = [

245

EndpointConfig(

246

name="create-user",

247

subject="users.create",

248

handler=create_user_handler,

249

metadata={"description": "Create new user account"}

250

),

251

EndpointConfig(

252

name="get-user",

253

subject="users.get",

254

handler=get_user_handler,

255

metadata={"description": "Retrieve user by ID"}

256

),

257

EndpointConfig(

258

name="update-user",

259

subject="users.update",

260

handler=update_user_handler,

261

metadata={"description": "Update user information"}

262

),

263

EndpointConfig(

264

name="delete-user",

265

subject="users.delete",

266

handler=delete_user_handler,

267

metadata={"description": "Delete user account"}

268

)

269

]

270

271

# Add all endpoints to service

272

for endpoint_config in endpoints:

273

await service.add_endpoint(endpoint_config)

274

275

# Group related endpoints

276

group_config = GroupConfig(

277

name="user-management",

278

queue_group="user-workers"

279

)

280

281

group = await service.add_group(group_config)

282

await group.add_endpoint(EndpointConfig(

283

name="bulk-import",

284

subject="users.bulk.import",

285

handler=bulk_import_handler

286

))

287

```

288

289

### Service Discovery and Monitoring

290

291

Built-in service discovery with health checks and statistics.

292

293

```python { .api }

294

@dataclass

295

class ServiceInfo:

296

"""Service information for discovery."""

297

name: str

298

id: str

299

version: str

300

description: str

301

endpoints: List[EndpointInfo]

302

metadata: Dict[str, str]

303

304

@dataclass

305

class ServiceStats:

306

"""Service statistics."""

307

name: str

308

id: str

309

started: datetime

310

endpoints: List[EndpointStats]

311

312

@dataclass

313

class ServicePing:

314

"""Service ping response."""

315

name: str

316

id: str

317

version: str

318

metadata: Dict[str, str]

319

320

@dataclass

321

class EndpointInfo:

322

"""Endpoint information."""

323

name: str

324

subject: str

325

metadata: Dict[str, str]

326

327

@dataclass

328

class EndpointStats:

329

"""Endpoint statistics."""

330

name: str

331

subject: str

332

num_requests: int

333

num_errors: int

334

last_error: str

335

processing_time: float

336

average_processing_time: float

337

```

338

339

#### Usage Examples

340

341

```python

342

# Service automatically responds to discovery requests

343

# These are handled internally by the framework

344

345

# Get service information (as a client)

346

service_info = await nc.request("$SRV.INFO", timeout=5.0)

347

info = json.loads(service_info.data.decode())

348

print(f"Available services: {[s['name'] for s in info]}")

349

350

# Ping specific service

351

ping_response = await nc.request("$SRV.PING.user-service", timeout=2.0)

352

ping = json.loads(ping_response.data.decode())

353

print(f"Service {ping['name']} version {ping['version']} is alive")

354

355

# Get service statistics

356

stats_response = await nc.request("$SRV.STATS.user-service", timeout=2.0)

357

stats = json.loads(stats_response.data.decode())

358

print(f"Total requests: {sum(e['num_requests'] for e in stats['endpoints'])}")

359

```

360

361

### Load Balancing and Scaling

362

363

Automatic load balancing across multiple service instances.

364

365

```python { .api }

366

# Constants for service framework

367

DEFAULT_QUEUE_GROUP = "q"

368

DEFAULT_PREFIX = "$SRV"

369

370

# Service control verbs

371

class ServiceVerb:

372

PING = "PING"

373

STATS = "STATS"

374

INFO = "INFO"

375

```

376

377

#### Usage Examples

378

379

```python

380

# Multiple service instances automatically load balance

381

async def start_service_instance(instance_id: str):

382

config = ServiceConfig(

383

name="user-service",

384

version="1.2.0",

385

metadata={"instance_id": instance_id}

386

)

387

388

service = await add_service(nc, config=config)

389

await service.start()

390

391

# Add same endpoints - they'll automatically load balance

392

await service.add_endpoint(EndpointConfig(

393

name="create-user",

394

subject="users.create",

395

handler=create_user_handler

396

))

397

398

return service

399

400

# Start multiple instances

401

instances = await asyncio.gather(*[

402

start_service_instance(f"instance-{i}")

403

for i in range(3)

404

])

405

406

print("Started 3 service instances with automatic load balancing")

407

```

408

409

## Service Configuration

410

411

```python { .api }

412

@dataclass

413

class ServiceConfig:

414

"""Complete service configuration."""

415

name: str

416

version: str = "0.0.1"

417

description: str = ""

418

metadata: Dict[str, str] = None

419

queue_group: str = DEFAULT_QUEUE_GROUP

420

421

# Control subjects configuration

422

stats_handler: Handler = None

423

info_handler: Handler = None

424

ping_handler: Handler = None

425

```

426

427

## Advanced Features

428

429

### Custom Control Handlers

430

431

Override default service control behavior.

432

433

```python

434

async def custom_stats_handler(request: Request):

435

# Custom statistics response

436

stats = {

437

"service": "user-service",

438

"uptime": time.time() - start_time,

439

"custom_metrics": {

440

"cache_hit_rate": 0.95,

441

"db_connections": 10

442

}

443

}

444

await request.respond(json.dumps(stats).encode())

445

446

# Use custom handler

447

config = ServiceConfig(

448

name="user-service",

449

version="1.2.0",

450

stats_handler=custom_stats_handler

451

)

452

453

service = await add_service(nc, config=config)

454

```

455

456

### Error Handling Patterns

457

458

Consistent error handling across endpoints.

459

460

```python

461

# Common error codes

462

class ErrorCodes:

463

INVALID_INPUT = "INVALID_INPUT"

464

NOT_FOUND = "NOT_FOUND"

465

UNAUTHORIZED = "UNAUTHORIZED"

466

INTERNAL_ERROR = "INTERNAL_ERROR"

467

RATE_LIMITED = "RATE_LIMITED"

468

469

# Error handling middleware

470

async def with_error_handling(handler: Handler) -> Handler:

471

async def wrapper(request: Request):

472

try:

473

await handler(request)

474

except ValidationError as e:

475

await request.respond_error(

476

ErrorCodes.INVALID_INPUT,

477

str(e)

478

)

479

except NotFoundError as e:

480

await request.respond_error(

481

ErrorCodes.NOT_FOUND,

482

str(e)

483

)

484

except Exception as e:

485

logger.error(f"Unexpected error: {e}")

486

await request.respond_error(

487

ErrorCodes.INTERNAL_ERROR,

488

"Internal server error"

489

)

490

return wrapper

491

492

# Apply to endpoints

493

await service.add_endpoint(EndpointConfig(

494

name="create-user",

495

subject="users.create",

496

handler=with_error_handling(create_user_handler)

497

))

498

```

499

500

## Constants

501

502

```python { .api }

503

# Default configuration

504

DEFAULT_QUEUE_GROUP = "q"

505

DEFAULT_PREFIX = "$SRV"

506

507

# Response types

508

INFO_RESPONSE_TYPE = "io.nats.micro.v1.info_response"

509

PING_RESPONSE_TYPE = "io.nats.micro.v1.ping_response"

510

STATS_RESPONSE_TYPE = "io.nats.micro.v1.stats_response"

511

512

# Headers

513

ERROR_HEADER = "Nats-Service-Error"

514

ERROR_CODE_HEADER = "Nats-Service-Error-Code"

515

```