0
# Cluster Support
1
2
prom-client provides built-in support for Node.js cluster environments, allowing you to aggregate metrics across multiple worker processes. This ensures that Prometheus scrapes get complete metrics data from all workers, not just individual worker metrics.
3
4
## Capabilities
5
6
### AggregatorRegistry
7
8
The AggregatorRegistry extends the base Registry class with cluster-specific functionality. It can aggregate metrics from all workers in a Node.js cluster and provide consolidated metrics to the master process.
9
10
```typescript { .api }
11
/**
12
* Extends the Registry class with cluster metrics aggregation support
13
*/
14
class AggregatorRegistry<T extends RegistryContentType> extends Registry<T> {
15
/**
16
* Create a new AggregatorRegistry instance
17
* @param regContentType Content type for metrics output
18
*/
19
constructor(regContentType?: T);
20
21
/**
22
* Gets aggregated metrics for all workers. The optional callback and
23
* returned Promise resolve with the same value; either may be used.
24
* @return Promise that resolves with the aggregated metrics.
25
*/
26
clusterMetrics(): Promise<string>;
27
28
/**
29
* Creates a new Registry instance from an array of metrics that were
30
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
31
* the method specified by their `aggregator` property, or by summation if
32
* `aggregator` is undefined.
33
* @param metricsArr Array of metrics, each of which created by
34
* `registry.getMetricsAsJSON()`.
35
* @return aggregated registry.
36
*/
37
static aggregate<T extends RegistryContentType>(
38
metricsArr: Array<object>
39
): Registry<T>;
40
41
/**
42
* Sets the registry or registries to be aggregated. Call from workers to
43
* use a registry/registries other than the default global registry.
44
* @param regs Registry or registries to be aggregated.
45
* @return void
46
*/
47
static setRegistries(
48
regs:
49
| Array<Registry<PrometheusContentType> | Registry<OpenMetricsContentType>>
50
| Registry<PrometheusContentType>
51
| Registry<OpenMetricsContentType>
52
): void;
53
}
54
```
55
56
**Usage Examples:**
57
58
```typescript
59
import cluster from "cluster";
60
import { AggregatorRegistry, register, Counter } from "prom-client";
61
62
if (cluster.isMaster) {
63
// Master process
64
const aggregatorRegistry = new AggregatorRegistry();
65
66
// Fork workers
67
for (let i = 0; i < require("os").cpus().length; i++) {
68
cluster.fork();
69
}
70
71
// Expose metrics endpoint with aggregated metrics
72
const express = require("express");
73
const app = express();
74
75
app.get("/metrics", async (req, res) => {
76
res.set("Content-Type", aggregatorRegistry.contentType);
77
const metrics = await aggregatorRegistry.clusterMetrics();
78
res.end(metrics);
79
});
80
81
app.listen(3000);
82
83
} else {
84
// Worker process
85
const httpRequests = new Counter({
86
name: "http_requests_total",
87
help: "Total HTTP requests",
88
labelNames: ["method", "status"],
89
});
90
91
// Your application logic here
92
// Metrics will be automatically aggregated by the master
93
}
94
```
95
96
### Metric Aggregation Strategies
97
98
Different metrics require different aggregation strategies when combining data from multiple workers. You can specify the aggregation method using the `aggregator` property in metric configurations.
99
100
```typescript { .api }
101
/**
102
* Aggregation methods, used for aggregating metrics in a Node.js cluster.
103
*/
104
type Aggregator = 'omit' | 'sum' | 'first' | 'min' | 'max' | 'average';
105
106
/**
107
* Functions that can be used to aggregate metrics from multiple registries.
108
* These functions operate on metric value objects, not raw numbers.
109
*/
110
const aggregators: {
111
/** Sum all metric values across workers */
112
sum: (metricValues: MetricValue[]) => number;
113
/** Take the first worker's metric value */
114
first: (metricValues: MetricValue[]) => number;
115
/** Take the minimum value across all workers */
116
min: (metricValues: MetricValue[]) => number;
117
/** Take the maximum value across all workers */
118
max: (metricValues: MetricValue[]) => number;
119
/** Calculate the arithmetic mean across all workers */
120
average: (metricValues: MetricValue[]) => number;
121
/** Omit this metric from aggregated output */
122
omit: (metricValues: MetricValue[]) => undefined;
123
};
124
125
/**
126
* Factory function to create custom aggregator functions
127
*/
128
const AggregatorFactory: (aggregatorFn: (values: MetricValue[]) => number) => Function;
129
```
130
131
**Usage Examples:**
132
133
```typescript
134
import { Counter, Gauge, Histogram } from "prom-client";
135
136
// Counter metrics are summed by default (appropriate for totals)
137
const requestsTotal = new Counter({
138
name: "http_requests_total",
139
help: "Total HTTP requests",
140
aggregator: "sum", // Default for counters
141
});
142
143
// Gauge metrics can use different aggregation strategies
144
const currentConnections = new Gauge({
145
name: "current_connections",
146
help: "Current active connections",
147
aggregator: "sum", // Sum across all workers
148
});
149
150
const memoryUsage = new Gauge({
151
name: "memory_usage_bytes",
152
help: "Memory usage per worker",
153
aggregator: "average", // Average across workers
154
});
155
156
const maxEventLoopDelay = new Gauge({
157
name: "max_event_loop_delay_seconds",
158
help: "Maximum event loop delay",
159
aggregator: "max", // Take maximum value
160
});
161
162
// Version info should only be reported once
163
const versionInfo = new Gauge({
164
name: "nodejs_version_info",
165
help: "Node.js version info",
166
aggregator: "first", // Take first worker's value
167
});
168
169
// Some metrics might not make sense in cluster context
170
const workerId = new Gauge({
171
name: "worker_id",
172
help: "Worker process ID",
173
aggregator: "omit", // Don't include in aggregated metrics
174
});
175
```
176
177
### Custom Registry Configuration
178
179
By default, the AggregatorRegistry aggregates metrics from the global registry in each worker. You can configure it to use custom registries.
180
181
```typescript { .api }
182
/**
183
* Sets the registry or registries to be aggregated. Call from workers to
184
* use a registry/registries other than the default global registry.
185
*/
186
static setRegistries(regs: Array<Registry> | Registry): void;
187
```
188
189
**Usage Examples:**
190
191
```typescript
192
import cluster from "cluster";
193
import { AggregatorRegistry, Registry, Counter } from "prom-client";
194
195
if (cluster.isMaster) {
196
const aggregatorRegistry = new AggregatorRegistry();
197
198
// Fork workers
199
const numWorkers = require("os").cpus().length;
200
for (let i = 0; i < numWorkers; i++) {
201
cluster.fork();
202
}
203
204
// Metrics endpoint
205
const express = require("express");
206
const app = express();
207
208
app.get("/metrics", async (req, res) => {
209
res.set("Content-Type", aggregatorRegistry.contentType);
210
const metrics = await aggregatorRegistry.clusterMetrics();
211
res.end(metrics);
212
});
213
214
app.listen(3000);
215
216
} else {
217
// Worker process with custom registries
218
const appRegistry = new Registry();
219
const systemRegistry = new Registry();
220
221
// Set custom registries for aggregation
222
AggregatorRegistry.setRegistries([appRegistry, systemRegistry]);
223
224
// Create metrics in custom registries
225
const appMetrics = new Counter({
226
name: "app_operations_total",
227
help: "Total application operations",
228
registers: [appRegistry],
229
});
230
231
const systemMetrics = new Counter({
232
name: "system_calls_total",
233
help: "Total system calls",
234
registers: [systemRegistry],
235
});
236
237
// Worker application logic...
238
}
239
```
240
241
### Manual Aggregation
242
243
You can also manually aggregate metrics from multiple sources using the static `aggregate` method:
244
245
```typescript { .api }
246
/**
247
* Creates a new Registry instance from an array of metrics that were
248
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
249
* the method specified by their `aggregator` property, or by summation if
250
* `aggregator` is undefined.
251
*/
252
static aggregate<T extends RegistryContentType>(
253
metricsArr: Array<object>
254
): Registry<T>;
255
```
256
257
**Usage Examples:**
258
259
```typescript
260
import { AggregatorRegistry, Registry, Counter } from "prom-client";
261
262
// Simulate metrics from multiple sources
263
const registry1 = new Registry();
264
const registry2 = new Registry();
265
266
const counter1 = new Counter({
267
name: "requests_total",
268
help: "Total requests",
269
registers: [registry1],
270
});
271
272
const counter2 = new Counter({
273
name: "requests_total",
274
help: "Total requests",
275
registers: [registry2],
276
});
277
278
counter1.inc(100);
279
counter2.inc(150);
280
281
// Get metrics as JSON from each registry
282
const metrics1 = await registry1.getMetricsAsJSON();
283
const metrics2 = await registry2.getMetricsAsJSON();
284
285
// Manually aggregate
286
const aggregatedRegistry = AggregatorRegistry.aggregate([metrics1, metrics2]);
287
288
// The aggregated registry now contains combined metrics
289
const combinedMetrics = await aggregatedRegistry.metrics();
290
console.log(combinedMetrics); // Will show requests_total = 250
291
```
292
293
### Complete Cluster Example
294
295
```typescript
296
import cluster from "cluster";
297
import os from "os";
298
import express from "express";
299
import {
300
AggregatorRegistry,
301
register,
302
Counter,
303
Gauge,
304
collectDefaultMetrics
305
} from "prom-client";
306
307
const numWorkers = os.cpus().length;
308
309
if (cluster.isMaster) {
310
console.log(`Master ${process.pid} is running`);
311
312
// Create aggregator registry for collecting metrics from workers
313
const aggregatorRegistry = new AggregatorRegistry();
314
315
// Fork workers
316
for (let i = 0; i < numWorkers; i++) {
317
cluster.fork();
318
}
319
320
// Handle worker crashes
321
cluster.on("exit", (worker, code, signal) => {
322
console.log(`Worker ${worker.process.pid} died. Restarting...`);
323
cluster.fork();
324
});
325
326
// Express app for metrics endpoint
327
const app = express();
328
329
app.get("/metrics", async (req, res) => {
330
try {
331
res.set("Content-Type", aggregatorRegistry.contentType);
332
const metrics = await aggregatorRegistry.clusterMetrics();
333
res.end(metrics);
334
} catch (error) {
335
console.error("Error getting cluster metrics:", error);
336
res.status(500).end("Error retrieving metrics");
337
}
338
});
339
340
app.get("/health", (req, res) => {
341
res.json({
342
status: "healthy",
343
workers: Object.keys(cluster.workers).length,
344
master_pid: process.pid
345
});
346
});
347
348
const port = process.env.PORT || 3000;
349
app.listen(port, () => {
350
console.log(`Metrics server listening on port ${port}`);
351
});
352
353
} else {
354
console.log(`Worker ${process.pid} started`);
355
356
// Collect default Node.js metrics in each worker
357
collectDefaultMetrics({
358
labels: { worker_id: cluster.worker.id.toString() }
359
});
360
361
// Create application-specific metrics
362
const httpRequestsTotal = new Counter({
363
name: "http_requests_total",
364
help: "Total HTTP requests",
365
labelNames: ["method", "route", "status_code"],
366
aggregator: "sum", // Sum requests across all workers
367
});
368
369
const activeConnections = new Gauge({
370
name: "active_connections",
371
help: "Number of active connections",
372
aggregator: "sum", // Sum connections across all workers
373
});
374
375
const workerMemoryUsage = new Gauge({
376
name: "worker_memory_usage_bytes",
377
help: "Memory usage per worker",
378
aggregator: "average", // Average memory usage
379
});
380
381
// Worker application
382
const app = express();
383
384
// Middleware to track requests
385
app.use((req, res, next) => {
386
const originalSend = res.send;
387
388
res.send = function(data) {
389
httpRequestsTotal.inc({
390
method: req.method,
391
route: req.route?.path || req.path,
392
status_code: res.statusCode.toString(),
393
});
394
return originalSend.call(this, data);
395
};
396
397
next();
398
});
399
400
// Sample routes
401
app.get("/", (req, res) => {
402
res.json({ worker: cluster.worker.id, pid: process.pid });
403
});
404
405
app.get("/heavy", (req, res) => {
406
// Simulate heavy work
407
const start = Date.now();
408
while (Date.now() - start < 100) {
409
// CPU intensive task
410
}
411
res.json({ message: "Heavy work completed", worker: cluster.worker.id });
412
});
413
414
// Update connection count
415
const server = app.listen(8080, () => {
416
console.log(`Worker ${cluster.worker.id} listening on port 8080`);
417
});
418
419
server.on("connection", () => {
420
activeConnections.inc();
421
});
422
423
server.on("close", () => {
424
activeConnections.dec();
425
});
426
427
// Periodically update memory usage
428
setInterval(() => {
429
const memUsage = process.memoryUsage();
430
workerMemoryUsage.set(memUsage.heapUsed);
431
}, 5000);
432
433
// Graceful shutdown
434
process.on("SIGTERM", () => {
435
console.log(`Worker ${cluster.worker.id} received SIGTERM`);
436
server.close(() => {
437
process.exit(0);
438
});
439
});
440
}
441
```
442
443
### Default Metrics in Cluster Mode
444
445
Default metrics work automatically in cluster mode with sensible aggregation strategies:
446
447
- **Counters** (like CPU time): Summed across workers
448
- **Gauges** (like memory usage): Averaged across workers
449
- **Version info**: First worker's value used
450
- **Event loop metrics**: Averaged (note: not perfectly accurate for percentiles)
451
452
```typescript
453
import { collectDefaultMetrics } from "prom-client";
454
455
// In worker processes
456
collectDefaultMetrics({
457
labels: {
458
worker_id: cluster.worker.id.toString(),
459
instance: process.env.HOSTNAME || "unknown"
460
}
461
});
462
```
463
464
## Types
465
466
```typescript { .api }
467
type Aggregator = 'omit' | 'sum' | 'first' | 'min' | 'max' | 'average';
468
469
type RegistryContentType = PrometheusContentType | OpenMetricsContentType;
470
471
interface MetricValue {
472
value: number;
473
labels: object;
474
metricName?: string;
475
}
476
477
const aggregators: {
478
sum: (metricValues: MetricValue[]) => number;
479
first: (metricValues: MetricValue[]) => number;
480
min: (metricValues: MetricValue[]) => number;
481
max: (metricValues: MetricValue[]) => number;
482
average: (metricValues: MetricValue[]) => number;
483
omit: (metricValues: MetricValue[]) => undefined;
484
};
485
```