CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-prom-client

Client for prometheus that provides comprehensive Prometheus metrics collection and exposition for Node.js applications

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cluster.mddocs/

Cluster Support

prom-client provides built-in support for Node.js cluster environments, allowing you to aggregate metrics across multiple worker processes. This ensures that Prometheus scrapes get complete metrics data from all workers, not just individual worker metrics.

Capabilities

AggregatorRegistry

The AggregatorRegistry extends the base Registry class with cluster-specific functionality. It can aggregate metrics from all workers in a Node.js cluster and provide consolidated metrics to the master process.

/**
 * Extends the Registry class with cluster metrics aggregation support
 */
class AggregatorRegistry<T extends RegistryContentType> extends Registry<T> {
  /**
   * Create a new AggregatorRegistry instance
   * @param regContentType Content type for metrics output
   */
  constructor(regContentType?: T);

  /**
   * Gets aggregated metrics for all workers. The optional callback and
   * returned Promise resolve with the same value; either may be used.
   * @return Promise that resolves with the aggregated metrics.
   */
  clusterMetrics(): Promise<string>;

  /**
   * Creates a new Registry instance from an array of metrics that were
   * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
   * the method specified by their `aggregator` property, or by summation if
   * `aggregator` is undefined.
   * @param metricsArr Array of metrics, each of which created by
   *   `registry.getMetricsAsJSON()`.
   * @return aggregated registry.
   */
  static aggregate<T extends RegistryContentType>(
    metricsArr: Array<object>
  ): Registry<T>;

  /**
   * Sets the registry or registries to be aggregated. Call from workers to
   * use a registry/registries other than the default global registry.
   * @param regs Registry or registries to be aggregated.
   * @return void
   */
  static setRegistries(
    regs:
      | Array<Registry<PrometheusContentType> | Registry<OpenMetricsContentType>>
      | Registry<PrometheusContentType>
      | Registry<OpenMetricsContentType>
  ): void;
}

Usage Examples:

import cluster from "cluster";
import { AggregatorRegistry, register, Counter } from "prom-client";

if (cluster.isMaster) {
  // Master process
  const aggregatorRegistry = new AggregatorRegistry();
  
  // Fork workers
  for (let i = 0; i < require("os").cpus().length; i++) {
    cluster.fork();
  }
  
  // Expose metrics endpoint with aggregated metrics
  const express = require("express");
  const app = express();
  
  app.get("/metrics", async (req, res) => {
    res.set("Content-Type", aggregatorRegistry.contentType);
    const metrics = await aggregatorRegistry.clusterMetrics();
    res.end(metrics);
  });
  
  app.listen(3000);
  
} else {
  // Worker process
  const httpRequests = new Counter({
    name: "http_requests_total",
    help: "Total HTTP requests",
    labelNames: ["method", "status"],
  });
  
  // Your application logic here
  // Metrics will be automatically aggregated by the master
}

Metric Aggregation Strategies

Different metrics require different aggregation strategies when combining data from multiple workers. You can specify the aggregation method using the aggregator property in metric configurations.

/**
 * Aggregation methods, used for aggregating metrics in a Node.js cluster.
 */
type Aggregator = 'omit' | 'sum' | 'first' | 'min' | 'max' | 'average';

/**
 * Functions that can be used to aggregate metrics from multiple registries.
 * These functions operate on metric value objects, not raw numbers.
 */
const aggregators: {
  /** Sum all metric values across workers */
  sum: (metricValues: MetricValue[]) => number;
  /** Take the first worker's metric value */
  first: (metricValues: MetricValue[]) => number;
  /** Take the minimum value across all workers */
  min: (metricValues: MetricValue[]) => number;
  /** Take the maximum value across all workers */
  max: (metricValues: MetricValue[]) => number;
  /** Calculate the arithmetic mean across all workers */
  average: (metricValues: MetricValue[]) => number;
  /** Omit this metric from aggregated output */
  omit: (metricValues: MetricValue[]) => undefined;
};

/**
 * Factory function to create custom aggregator functions
 */
const AggregatorFactory: (aggregatorFn: (values: MetricValue[]) => number) => Function;

Usage Examples:

import { Counter, Gauge, Histogram } from "prom-client";

// Counter metrics are summed by default (appropriate for totals)
const requestsTotal = new Counter({
  name: "http_requests_total",
  help: "Total HTTP requests",
  aggregator: "sum", // Default for counters
});

// Gauge metrics can use different aggregation strategies
const currentConnections = new Gauge({
  name: "current_connections",
  help: "Current active connections",
  aggregator: "sum", // Sum across all workers
});

const memoryUsage = new Gauge({
  name: "memory_usage_bytes",
  help: "Memory usage per worker",
  aggregator: "average", // Average across workers
});

const maxEventLoopDelay = new Gauge({
  name: "max_event_loop_delay_seconds",
  help: "Maximum event loop delay",
  aggregator: "max", // Take maximum value
});

// Version info should only be reported once
const versionInfo = new Gauge({
  name: "nodejs_version_info",
  help: "Node.js version info",
  aggregator: "first", // Take first worker's value
});

// Some metrics might not make sense in cluster context
const workerId = new Gauge({
  name: "worker_id",
  help: "Worker process ID",
  aggregator: "omit", // Don't include in aggregated metrics
});

Custom Registry Configuration

By default, the AggregatorRegistry aggregates metrics from the global registry in each worker. You can configure it to use custom registries.

/**
 * Sets the registry or registries to be aggregated. Call from workers to
 * use a registry/registries other than the default global registry.
 */
static setRegistries(regs: Array<Registry> | Registry): void;

Usage Examples:

import cluster from "cluster";
import { AggregatorRegistry, Registry, Counter } from "prom-client";

if (cluster.isMaster) {
  const aggregatorRegistry = new AggregatorRegistry();
  
  // Fork workers
  const numWorkers = require("os").cpus().length;
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }
  
  // Metrics endpoint
  const express = require("express");
  const app = express();
  
  app.get("/metrics", async (req, res) => {
    res.set("Content-Type", aggregatorRegistry.contentType);
    const metrics = await aggregatorRegistry.clusterMetrics();
    res.end(metrics);
  });
  
  app.listen(3000);
  
} else {
  // Worker process with custom registries
  const appRegistry = new Registry();
  const systemRegistry = new Registry();
  
  // Set custom registries for aggregation
  AggregatorRegistry.setRegistries([appRegistry, systemRegistry]);
  
  // Create metrics in custom registries
  const appMetrics = new Counter({
    name: "app_operations_total",
    help: "Total application operations",
    registers: [appRegistry],
  });
  
  const systemMetrics = new Counter({
    name: "system_calls_total", 
    help: "Total system calls",
    registers: [systemRegistry],
  });
  
  // Worker application logic...
}

Manual Aggregation

You can also manually aggregate metrics from multiple sources using the static aggregate method:

/**
 * Creates a new Registry instance from an array of metrics that were
 * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
 * the method specified by their `aggregator` property, or by summation if
 * `aggregator` is undefined.
 */
static aggregate<T extends RegistryContentType>(
  metricsArr: Array<object>
): Registry<T>;

Usage Examples:

import { AggregatorRegistry, Registry, Counter } from "prom-client";

// Simulate metrics from multiple sources
const registry1 = new Registry();
const registry2 = new Registry();

const counter1 = new Counter({
  name: "requests_total",
  help: "Total requests",
  registers: [registry1],
});

const counter2 = new Counter({
  name: "requests_total",
  help: "Total requests", 
  registers: [registry2],
});

counter1.inc(100);
counter2.inc(150);

// Get metrics as JSON from each registry
const metrics1 = await registry1.getMetricsAsJSON();
const metrics2 = await registry2.getMetricsAsJSON();

// Manually aggregate
const aggregatedRegistry = AggregatorRegistry.aggregate([metrics1, metrics2]);

// The aggregated registry now contains combined metrics
const combinedMetrics = await aggregatedRegistry.metrics();
console.log(combinedMetrics); // Will show requests_total = 250

Complete Cluster Example

import cluster from "cluster";
import os from "os";
import express from "express";
import { 
  AggregatorRegistry, 
  register, 
  Counter, 
  Gauge, 
  collectDefaultMetrics 
} from "prom-client";

const numWorkers = os.cpus().length;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);
  
  // Create aggregator registry for collecting metrics from workers
  const aggregatorRegistry = new AggregatorRegistry();
  
  // Fork workers
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }
  
  // Handle worker crashes
  cluster.on("exit", (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died. Restarting...`);
    cluster.fork();
  });
  
  // Express app for metrics endpoint
  const app = express();
  
  app.get("/metrics", async (req, res) => {
    try {
      res.set("Content-Type", aggregatorRegistry.contentType);
      const metrics = await aggregatorRegistry.clusterMetrics();
      res.end(metrics);
    } catch (error) {
      console.error("Error getting cluster metrics:", error);
      res.status(500).end("Error retrieving metrics");
    }
  });
  
  app.get("/health", (req, res) => {
    res.json({ 
      status: "healthy",
      workers: Object.keys(cluster.workers).length,
      master_pid: process.pid
    });
  });
  
  const port = process.env.PORT || 3000;
  app.listen(port, () => {
    console.log(`Metrics server listening on port ${port}`);
  });
  
} else {
  console.log(`Worker ${process.pid} started`);
  
  // Collect default Node.js metrics in each worker
  collectDefaultMetrics({
    labels: { worker_id: cluster.worker.id.toString() }
  });
  
  // Create application-specific metrics
  const httpRequestsTotal = new Counter({
    name: "http_requests_total",
    help: "Total HTTP requests",
    labelNames: ["method", "route", "status_code"],
    aggregator: "sum", // Sum requests across all workers
  });
  
  const activeConnections = new Gauge({
    name: "active_connections",
    help: "Number of active connections",
    aggregator: "sum", // Sum connections across all workers
  });
  
  const workerMemoryUsage = new Gauge({
    name: "worker_memory_usage_bytes",
    help: "Memory usage per worker",
    aggregator: "average", // Average memory usage
  });
  
  // Worker application
  const app = express();
  
  // Middleware to track requests
  app.use((req, res, next) => {
    const originalSend = res.send;
    
    res.send = function(data) {
      httpRequestsTotal.inc({
        method: req.method,
        route: req.route?.path || req.path,
        status_code: res.statusCode.toString(),
      });
      return originalSend.call(this, data);
    };
    
    next();
  });
  
  // Sample routes
  app.get("/", (req, res) => {
    res.json({ worker: cluster.worker.id, pid: process.pid });
  });
  
  app.get("/heavy", (req, res) => {
    // Simulate heavy work
    const start = Date.now();
    while (Date.now() - start < 100) {
      // CPU intensive task
    }
    res.json({ message: "Heavy work completed", worker: cluster.worker.id });
  });
  
  // Update connection count
  const server = app.listen(8080, () => {
    console.log(`Worker ${cluster.worker.id} listening on port 8080`);
  });
  
  server.on("connection", () => {
    activeConnections.inc();
  });
  
  server.on("close", () => {
    activeConnections.dec();
  });
  
  // Periodically update memory usage
  setInterval(() => {
    const memUsage = process.memoryUsage();
    workerMemoryUsage.set(memUsage.heapUsed);
  }, 5000);
  
  // Graceful shutdown
  process.on("SIGTERM", () => {
    console.log(`Worker ${cluster.worker.id} received SIGTERM`);
    server.close(() => {
      process.exit(0);
    });
  });
}

Default Metrics in Cluster Mode

Default metrics work automatically in cluster mode with sensible aggregation strategies:

  • Counters (like CPU time): Summed across workers
  • Gauges (like memory usage): Averaged across workers
  • Version info: First worker's value used
  • Event loop metrics: Averaged (note: not perfectly accurate for percentiles)
import { collectDefaultMetrics } from "prom-client";

// In worker processes
collectDefaultMetrics({
  labels: { 
    worker_id: cluster.worker.id.toString(),
    instance: process.env.HOSTNAME || "unknown"
  }
});

Types

type Aggregator = 'omit' | 'sum' | 'first' | 'min' | 'max' | 'average';

type RegistryContentType = PrometheusContentType | OpenMetricsContentType;

interface MetricValue {
  value: number;
  labels: object;
  metricName?: string;
}

const aggregators: {
  sum: (metricValues: MetricValue[]) => number;
  first: (metricValues: MetricValue[]) => number;
  min: (metricValues: MetricValue[]) => number;
  max: (metricValues: MetricValue[]) => number;
  average: (metricValues: MetricValue[]) => number;
  omit: (metricValues: MetricValue[]) => undefined;
};

Install with Tessl CLI

npx tessl i tessl/npm-prom-client

docs

cluster.md

default-metrics.md

index.md

metrics.md

pushgateway.md

registry.md

tile.json