0
# Services API
1
2
NATS Services API provides a framework for building discoverable microservices with built-in health monitoring, statistics collection, and service registry capabilities.
3
4
## Capabilities
5
6
### Service Registration
7
8
Register services with NATS for discovery and monitoring.
9
10
```typescript { .api }
11
/**
12
* Access to Services API from NATS connection
13
*/
14
services: ServicesAPI;
15
16
interface ServicesAPI {
17
/**
18
* Register a new service with NATS
19
* @param config - Service configuration including name, version, and endpoints
20
* @returns Promise resolving to Service instance
21
*/
22
add(config: ServiceConfig): Promise<Service>;
23
24
/**
25
* Create a service client for discovery and monitoring
26
* @param opts - Request options for service operations
27
* @param prefix - Custom service prefix (default: "$SRV")
28
* @returns ServiceClient instance
29
*/
30
client(opts?: RequestManyOptions, prefix?: string): ServiceClient;
31
}
32
33
interface ServiceConfig {
34
/** Service name (required) */
35
name: string;
36
/** Service version (required) */
37
version: string;
38
/** Service description */
39
description?: string;
40
/** Service metadata */
41
metadata?: Record<string, string>;
42
/** Service endpoints */
43
queue?: string;
44
/** Schema for service definition */
45
schema?: ServiceSchema;
46
}
47
48
interface ServiceSchema {
49
/** Request schema */
50
request?: Record<string, unknown>;
51
/** Response schema */
52
response?: Record<string, unknown>;
53
}
54
55
interface Service {
56
/** Stop the service */
57
stop(): Promise<void>;
58
/** Check if service is stopped */
59
stopped: boolean;
60
/** Service information */
61
info: ServiceInfo;
62
/** Service statistics */
63
stats(): ServiceStats;
64
}
65
```
66
67
**Usage Examples:**
68
69
```typescript
70
import { connect } from "nats";
71
72
const nc = await connect();
73
74
// Create a simple service
75
const service = await nc.services.add({
76
name: "calculator",
77
version: "1.0.0",
78
description: "Simple math calculator service"
79
});
80
81
console.log(`Service started: ${service.info.name} v${service.info.version}`);
82
83
// Create service with metadata
84
const userService = await nc.services.add({
85
name: "user-manager",
86
version: "2.1.0",
87
description: "User management service",
88
metadata: {
89
region: "us-west-2",
90
environment: "production",
91
team: "backend"
92
}
93
});
94
95
// Stop service gracefully
96
await userService.stop();
97
```
98
99
### Service Endpoints
100
101
Define service endpoints that handle specific request types.
102
103
```typescript { .api }
104
interface ServiceGroup {
105
/** Add endpoint to service group */
106
addEndpoint(
107
name: string,
108
opts: ServiceEndpointOpts,
109
handler: ServiceHandler
110
): QueuedIterator<ServiceMsg>;
111
112
/** Add multiple endpoints */
113
addGroup(name: string): ServiceGroup;
114
}
115
116
interface ServiceEndpointOpts {
117
/** Endpoint subject pattern */
118
subject?: string;
119
/** Endpoint queue group */
120
queue_group?: string;
121
/** Endpoint metadata */
122
metadata?: Record<string, string>;
123
/** Endpoint schema */
124
schema?: {
125
request?: Record<string, unknown>;
126
response?: Record<string, unknown>;
127
};
128
}
129
130
type ServiceHandler = (
131
err: NatsError | null,
132
msg: ServiceMsg
133
) => Promise<void> | void;
134
135
interface ServiceMsg extends Msg {
136
/** Respond to service message */
137
respond(data?: Payload, opts?: ServiceMsgResponse): boolean;
138
/** Respond with error */
139
respondError(code: number, description: string, data?: Payload): boolean;
140
}
141
142
interface ServiceMsgResponse {
143
/** Response headers */
144
headers?: MsgHdrs;
145
}
146
```
147
148
**Usage Examples:**
149
150
```typescript
151
import { connect, StringCodec, JSONCodec, headers } from "nats";
152
153
const nc = await connect();
154
const sc = StringCodec();
155
const jc = JSONCodec();
156
157
// Create calculator service with endpoints
158
const calc = await nc.services.add({
159
name: "calculator",
160
version: "1.0.0",
161
description: "Math calculator service"
162
});
163
164
// Add endpoint for addition
165
const addGroup = calc.addGroup("math");
166
const addEndpoint = addGroup.addEndpoint(
167
"add",
168
{
169
subject: "calc.add",
170
metadata: { operation: "addition" }
171
},
172
(err, msg) => {
173
if (err) {
174
console.error("Handler error:", err);
175
return;
176
}
177
178
try {
179
const { a, b } = jc.decode(msg.data);
180
const result = { sum: a + b };
181
msg.respond(jc.encode(result));
182
} catch (error) {
183
msg.respondError(400, "Invalid input format", sc.encode("Expected {a: number, b: number}"));
184
}
185
}
186
);
187
188
// Add endpoint for division with error handling
189
const divEndpoint = addGroup.addEndpoint(
190
"divide",
191
{ subject: "calc.divide" },
192
(err, msg) => {
193
if (err) return;
194
195
try {
196
const { dividend, divisor } = jc.decode(msg.data);
197
198
if (divisor === 0) {
199
msg.respondError(422, "Division by zero", sc.encode("Divisor cannot be zero"));
200
return;
201
}
202
203
const result = { quotient: dividend / divisor };
204
msg.respond(jc.encode(result));
205
} catch (error) {
206
msg.respondError(400, "Invalid input", sc.encode(error.message));
207
}
208
}
209
);
210
211
// User management service with authentication
212
const userSvc = await nc.services.add({
213
name: "user-service",
214
version: "1.2.0"
215
});
216
217
const usersGroup = userSvc.addGroup("users");
218
const getUserEndpoint = usersGroup.addEndpoint(
219
"get",
220
{ subject: "users.get" },
221
async (err, msg) => {
222
if (err) return;
223
224
// Check authentication header
225
const token = msg.headers?.get("authorization");
226
if (!token) {
227
msg.respondError(401, "Unauthorized", sc.encode("Missing authorization header"));
228
return;
229
}
230
231
try {
232
const { userId } = jc.decode(msg.data);
233
const user = await getUserFromDatabase(userId); // Your DB call
234
235
if (!user) {
236
msg.respondError(404, "User not found", sc.encode(`User ${userId} not found`));
237
return;
238
}
239
240
// Add response headers
241
const responseHeaders = headers();
242
responseHeaders.set("content-type", "application/json");
243
responseHeaders.set("cache-control", "max-age=300");
244
245
msg.respond(jc.encode(user), { headers: responseHeaders });
246
} catch (error) {
247
msg.respondError(500, "Internal server error", sc.encode(error.message));
248
}
249
}
250
);
251
```
252
253
### Service Discovery
254
255
Discover and interact with registered services.
256
257
```typescript { .api }
258
interface ServiceClient {
259
/**
260
* Ping services to check availability
261
* @param name - Optional service name filter
262
* @param id - Optional service ID filter
263
* @returns Promise resolving to async iterator of service identities
264
*/
265
ping(name?: string, id?: string): Promise<QueuedIterator<ServiceIdentity>>;
266
267
/**
268
* Get statistics from services
269
* @param name - Optional service name filter
270
* @param id - Optional service ID filter
271
* @returns Promise resolving to async iterator of service statistics
272
*/
273
stats(name?: string, id?: string): Promise<QueuedIterator<ServiceStats>>;
274
275
/**
276
* Get information from services
277
* @param name - Optional service name filter
278
* @param id - Optional service ID filter
279
* @returns Promise resolving to async iterator of service information
280
*/
281
info(name?: string, id?: string): Promise<QueuedIterator<ServiceInfo>>;
282
}
283
284
interface ServiceIdentity {
285
/** Service name */
286
name: string;
287
/** Service ID */
288
id: string;
289
/** Service version */
290
version: string;
291
/** Service metadata */
292
metadata?: Record<string, string>;
293
}
294
295
interface ServiceInfo {
296
/** Service name */
297
name: string;
298
/** Service ID */
299
id: string;
300
/** Service version */
301
version: string;
302
/** Service description */
303
description?: string;
304
/** Service metadata */
305
metadata?: Record<string, string>;
306
/** Service endpoints */
307
endpoints: EndpointInfo[];
308
/** Service type */
309
type: string;
310
}
311
312
interface ServiceStats {
313
/** Service name */
314
name: string;
315
/** Service ID */
316
id: string;
317
/** Service version */
318
version: string;
319
/** Service start time */
320
started: Date;
321
/** Service endpoints statistics */
322
endpoints: EndpointStats[];
323
}
324
325
interface EndpointInfo {
326
/** Endpoint name */
327
name: string;
328
/** Endpoint subject */
329
subject: string;
330
/** Endpoint queue group */
331
queue_group?: string;
332
/** Endpoint metadata */
333
metadata?: Record<string, string>;
334
}
335
336
interface EndpointStats {
337
/** Endpoint name */
338
name: string;
339
/** Endpoint subject */
340
subject: string;
341
/** Number of requests received */
342
num_requests: number;
343
/** Number of errors */
344
num_errors: number;
345
/** Last error message */
346
last_error?: string;
347
/** Average processing time */
348
processing_time: number;
349
/** Average queue time */
350
queue_time: number;
351
/** Additional endpoint data */
352
data?: Record<string, unknown>;
353
}
354
```
355
356
**Usage Examples:**
357
358
```typescript
359
import { connect } from "nats";
360
361
const nc = await connect();
362
const client = nc.services.client();
363
364
// Discover all services
365
console.log("Discovering services...");
366
const services = await client.ping();
367
for await (const service of services) {
368
console.log(`Found: ${service.name} v${service.version} (${service.id})`);
369
}
370
371
// Get information about specific service
372
const calcInfo = await client.info("calculator");
373
for await (const info of calcInfo) {
374
console.log(`Service: ${info.name} - ${info.description}`);
375
console.log("Endpoints:");
376
for (const endpoint of info.endpoints) {
377
console.log(` - ${endpoint.name}: ${endpoint.subject}`);
378
}
379
}
380
381
// Monitor service statistics
382
const statsMonitor = await client.stats("user-service");
383
for await (const stats of statsMonitor) {
384
console.log(`Service: ${stats.name}, Started: ${stats.started}`);
385
for (const endpoint of stats.endpoints) {
386
console.log(` ${endpoint.name}: ${endpoint.num_requests} requests, ${endpoint.num_errors} errors`);
387
if (endpoint.processing_time > 0) {
388
console.log(` Avg processing: ${endpoint.processing_time}ms`);
389
}
390
}
391
}
392
393
// Health check for specific services
394
async function checkServiceHealth() {
395
const pingResults = await client.ping();
396
const healthyServices: string[] = [];
397
398
for await (const service of pingResults) {
399
healthyServices.push(`${service.name}@${service.version}`);
400
}
401
402
console.log(`Healthy services: ${healthyServices.join(", ")}`);
403
return healthyServices;
404
}
405
406
// Run health check every 30 seconds
407
setInterval(checkServiceHealth, 30000);
408
```
409
410
### Service Communication
411
412
Communicate with discovered services using standard NATS messaging.
413
414
```typescript
415
// Service client wrapper for easy communication
416
class ServiceProxy {
417
constructor(private nc: NatsConnection, private serviceName: string) {}
418
419
async callEndpoint<T, R>(
420
endpoint: string,
421
data: T,
422
timeout = 5000
423
): Promise<R> {
424
const subject = `${this.serviceName}.${endpoint}`;
425
const response = await this.nc.request(
426
subject,
427
JSONCodec<T>().encode(data),
428
{ timeout }
429
);
430
431
// Check for service error response
432
if (response.headers?.hasError) {
433
throw new Error(`Service error ${response.headers.code}: ${response.headers.description}`);
434
}
435
436
return JSONCodec<R>().decode(response.data);
437
}
438
439
async callWithAuth<T, R>(
440
endpoint: string,
441
data: T,
442
token: string,
443
timeout = 5000
444
): Promise<R> {
445
const subject = `${this.serviceName}.${endpoint}`;
446
const hdrs = headers();
447
hdrs.set("authorization", `Bearer ${token}`);
448
449
const response = await this.nc.request(
450
subject,
451
JSONCodec<T>().encode(data),
452
{ timeout, headers: hdrs }
453
);
454
455
if (response.headers?.hasError) {
456
throw new Error(`Service error ${response.headers.code}: ${response.headers.description}`);
457
}
458
459
return JSONCodec<R>().decode(response.data);
460
}
461
}
462
463
// Usage examples
464
const nc = await connect();
465
466
// Calculator service client
467
const calc = new ServiceProxy(nc, "calc");
468
469
try {
470
const sum = await calc.callEndpoint<{a: number, b: number}, {sum: number}>(
471
"add",
472
{ a: 10, b: 20 }
473
);
474
console.log(`10 + 20 = ${sum.sum}`);
475
476
const quotient = await calc.callEndpoint<{dividend: number, divisor: number}, {quotient: number}>(
477
"divide",
478
{ dividend: 100, divisor: 5 }
479
);
480
console.log(`100 / 5 = ${quotient.quotient}`);
481
} catch (err) {
482
console.error("Calculator service error:", err.message);
483
}
484
485
// User service client with authentication
486
const users = new ServiceProxy(nc, "users");
487
488
try {
489
const user = await users.callWithAuth<{userId: string}, {id: string, name: string, email: string}>(
490
"get",
491
{ userId: "user123" },
492
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." // JWT token
493
);
494
console.log(`User: ${user.name} (${user.email})`);
495
} catch (err) {
496
console.error("User service error:", err.message);
497
}
498
```
499
500
### Service Monitoring
501
502
Monitor service health and performance metrics.
503
504
```typescript
505
// Service monitoring dashboard
506
class ServiceMonitor {
507
private client: ServiceClient;
508
509
constructor(nc: NatsConnection) {
510
this.client = nc.services.client();
511
}
512
513
async getServiceInventory(): Promise<ServiceInfo[]> {
514
const services: ServiceInfo[] = [];
515
const serviceInfo = await this.client.info();
516
517
for await (const info of serviceInfo) {
518
services.push(info);
519
}
520
521
return services;
522
}
523
524
async getServiceMetrics(): Promise<Map<string, ServiceStats>> {
525
const metrics = new Map<string, ServiceStats>();
526
const stats = await this.client.stats();
527
528
for await (const stat of stats) {
529
metrics.set(`${stat.name}@${stat.version}`, stat);
530
}
531
532
return metrics;
533
}
534
535
async monitorServiceHealth(intervalMs = 10000): Promise<void> {
536
console.log("Starting service health monitoring...");
537
538
setInterval(async () => {
539
try {
540
const services = await this.getServiceInventory();
541
const metrics = await this.getServiceMetrics();
542
543
console.log(`\n=== Service Health Report (${new Date().toISOString()}) ===`);
544
545
for (const service of services) {
546
const key = `${service.name}@${service.version}`;
547
const stats = metrics.get(key);
548
549
console.log(`\n${service.name} v${service.version} (${service.id})`);
550
console.log(` Description: ${service.description || 'N/A'}`);
551
console.log(` Endpoints: ${service.endpoints.length}`);
552
553
if (stats) {
554
console.log(` Uptime: ${this.formatUptime(stats.started)}`);
555
556
let totalRequests = 0;
557
let totalErrors = 0;
558
559
for (const endpoint of stats.endpoints) {
560
totalRequests += endpoint.num_requests;
561
totalErrors += endpoint.num_errors;
562
563
const errorRate = endpoint.num_requests > 0
564
? (endpoint.num_errors / endpoint.num_requests * 100).toFixed(2)
565
: "0.00";
566
567
console.log(` ${endpoint.name}: ${endpoint.num_requests} reqs, ${errorRate}% errors, ${endpoint.processing_time}ms avg`);
568
}
569
570
const overallErrorRate = totalRequests > 0
571
? (totalErrors / totalRequests * 100).toFixed(2)
572
: "0.00";
573
574
console.log(` Overall: ${totalRequests} requests, ${overallErrorRate}% error rate`);
575
} else {
576
console.log(` Status: No statistics available`);
577
}
578
}
579
} catch (err) {
580
console.error("Health monitoring error:", err);
581
}
582
}, intervalMs);
583
}
584
585
private formatUptime(started: Date): string {
586
const uptimeMs = Date.now() - started.getTime();
587
const seconds = Math.floor(uptimeMs / 1000);
588
589
if (seconds < 60) return `${seconds}s`;
590
if (seconds < 3600) return `${Math.floor(seconds / 60)}m ${seconds % 60}s`;
591
592
const hours = Math.floor(seconds / 3600);
593
const mins = Math.floor((seconds % 3600) / 60);
594
return `${hours}h ${mins}m`;
595
}
596
597
async alertOnHighErrorRate(threshold = 5.0): Promise<void> {
598
const metrics = await this.getServiceMetrics();
599
600
for (const [serviceKey, stats] of metrics) {
601
for (const endpoint of stats.endpoints) {
602
if (endpoint.num_requests > 0) {
603
const errorRate = (endpoint.num_errors / endpoint.num_requests) * 100;
604
605
if (errorRate > threshold) {
606
console.error(`🚨 HIGH ERROR RATE ALERT: ${serviceKey}/${endpoint.name}`);
607
console.error(` Error rate: ${errorRate.toFixed(2)}% (${endpoint.num_errors}/${endpoint.num_requests})`);
608
console.error(` Last error: ${endpoint.last_error || 'N/A'}`);
609
}
610
}
611
}
612
}
613
}
614
}
615
616
// Usage
617
const nc = await connect();
618
const monitor = new ServiceMonitor(nc);
619
620
// Get current service inventory
621
const services = await monitor.getServiceInventory();
622
console.log(`Discovered ${services.length} services`);
623
624
// Start continuous monitoring
625
await monitor.monitorServiceHealth(15000); // Every 15 seconds
626
627
// Check for high error rates
628
setInterval(() => monitor.alertOnHighErrorRate(10.0), 60000); // Every minute, 10% threshold
629
```
630
631
## Types
632
633
```typescript { .api }
634
enum ServiceVerb {
635
PING = "PING",
636
STATS = "STATS",
637
INFO = "INFO",
638
SCHEMA = "SCHEMA"
639
}
640
641
enum ServiceResponseType {
642
Singleton = "io.nats.micro.v1.ping_response",
643
Stream = "io.nats.micro.v1.stats_response"
644
}
645
646
interface RequestManyOptions {
647
strategy: RequestStrategy;
648
maxWait: number;
649
headers?: MsgHdrs;
650
maxMessages?: number;
651
noMux?: boolean;
652
jitter?: number;
653
}
654
655
interface ServiceMetadata {
656
[key: string]: string;
657
}
658
659
interface ServiceGroup {
660
addEndpoint(name: string, opts: EndpointOptions, handler: ServiceHandler): QueuedIterator<ServiceMsg>;
661
addGroup(name: string): ServiceGroup;
662
}
663
664
interface EndpointOptions {
665
subject?: string;
666
queue_group?: string;
667
metadata?: ServiceMetadata;
668
schema?: {
669
request?: Record<string, unknown>;
670
response?: Record<string, unknown>;
671
};
672
}
673
674
interface NamedEndpointStats {
675
name: string;
676
subject: string;
677
num_requests: number;
678
num_errors: number;
679
last_error?: string;
680
processing_time: number;
681
queue_time: number;
682
data?: Record<string, unknown>;
683
}
684
685
interface ServiceError extends NatsError {
686
/** Service error code */
687
service_error_code?: number;
688
/** Service error message */
689
service_error_message?: string;
690
}
691
692
const ServiceErrorHeader = "Nats-Service-Error";
693
const ServiceErrorCodeHeader = "Nats-Service-Error-Code";
694
695
interface QueuedIterator<T> {
696
next(): Promise<IteratorResult<T>>;
697
stop(): void;
698
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
699
}
700
```