JMX metrics reporter for Apache Flink that enables monitoring and management of Flink applications through Java Management Extensions (JMX)
npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-jmx@2.1.0Flink Metrics JMX is a metrics reporter implementation for Apache Flink that exposes Flink's internal metrics (counters, gauges, histograms, and meters) through Java Management Extensions (JMX). It enables real-time monitoring and management of Flink streaming and batch processing applications through standard JMX interfaces, making it essential for production deployments requiring comprehensive observability and operational insights.
Maven Dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>2.1.0</version>
</dependency>Gradle Dependency:
implementation 'org.apache.flink:flink-metrics-jmx:2.1.0'import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
// MBean interfaces are inner interfaces of JMXReporter:
// JMXReporter.JmxCounterMBean, JMXReporter.JmxGaugeMBean, etc.The JMX metrics reporter is typically configured through Flink's configuration system and operates automatically:
// The reporter is usually instantiated by Flink's metrics system
// via the factory pattern using configuration properties
Properties config = new Properties();
config.setProperty("port", "9999"); // Optional: JMX server port
JMXReporterFactory factory = new JMXReporterFactory();
JMXReporter reporter = factory.createMetricReporter(config);
// Reporter lifecycle is managed by Flink
reporter.open(new MetricConfig());
// Check if JMX server port is available
Optional<Integer> port = reporter.getPort();
if (port.isPresent()) {
System.out.println("JMX server available on port: " + port.get());
}Configuration in Flink:
metrics:
reporters:
- class: org.apache.flink.metrics.jmx.JMXReporterFactory
port: 9999 # Optional JMX server portThe JMX metrics reporter is built around several key components:
Factory class for creating JMXReporter instances, implementing the standard Flink MetricReporterFactory interface.
/**
* Factory for creating JMXReporter instances
*/
public class JMXReporterFactory implements MetricReporterFactory {
/** Configuration key for port setting */
public static final String ARG_PORT = "port";
/**
* Creates a new JMXReporter instance with the given configuration
* @param properties Configuration properties for the reporter
* @return Configured JMXReporter instance
*/
public JMXReporter createMetricReporter(Properties properties);
}Core reporter class that exports Flink metrics as JMX MBeans, with automatic registration and deregistration capabilities. The reporter is thread-safe and handles concurrent metric registration.
/**
* MetricReporter that exports Metrics via JMX
*/
public class JMXReporter implements MetricReporter {
/** JMX domain prefix for all Flink metrics */
public static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
/**
* Opens the reporter with the given configuration
* @param config Metric configuration
*/
public void open(MetricConfig config);
/**
* Closes the reporter and cleans up resources
*/
public void close();
/**
* Gets the port of the JMX server if available
* @return Optional containing the port number, empty if not available
*/
public Optional<Integer> getPort();
/**
* Called when a metric is added to register it as a JMX MBean
* @param metric The metric to register
* @param metricName Name of the metric
* @param group Metric group containing variables and scope information
*/
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
/**
* Called when a metric is removed to unregister it from JMX
* @param metric The metric to unregister
* @param metricName Name of the metric
* @param group Metric group containing variables and scope information
*/
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}Static utility methods for generating JMX-compliant domain names and property tables.
/**
* Generates JMX property table from metric group variables
* @param variables Map of variable names to values
* @return Hashtable suitable for JMX ObjectName construction
*/
public static Hashtable<String, String> generateJmxTable(Map<String, String> variables);
/**
* Generates JMX domain name for a metric
* @param metricName Name of the metric
* @param group Metric group providing logical scope
* @return Complete JMX domain name
*/
public static String generateJmxDomain(String metricName, MetricGroup group);
/**
* Replaces characters invalid for JMX names with valid alternatives
* @param str String to process
* @return String with invalid characters replaced
*/
public static String replaceInvalidChars(String str);Standard JMX MBean interfaces for different metric types, enabling type-safe access to metric values. All interfaces are inner interfaces of the JMXReporter class.
/**
* Base interface for all JMX metric beans
*/
public interface MetricMBean {
}
/**
* JMX interface for counter metrics
*/
public interface JmxCounterMBean extends MetricMBean {
/**
* Gets the current count value
* @return Current count
*/
long getCount();
}
/**
* JMX interface for gauge metrics
*/
public interface JmxGaugeMBean extends MetricMBean {
/**
* Gets the current gauge value
* @return Current gauge value
*/
Object getValue();
}
/**
* JMX interface for histogram metrics providing statistical information
*/
public interface JmxHistogramMBean extends MetricMBean {
/** Gets the number of recorded values */
long getCount();
/** Gets the arithmetic mean of recorded values */
double getMean();
/** Gets the standard deviation of recorded values */
double getStdDev();
/** Gets the maximum recorded value */
long getMax();
/** Gets the minimum recorded value */
long getMin();
/** Gets the median (50th percentile) */
double getMedian();
/** Gets the 75th percentile */
double get75thPercentile();
/** Gets the 95th percentile */
double get95thPercentile();
/** Gets the 98th percentile */
double get98thPercentile();
/** Gets the 99th percentile */
double get99thPercentile();
/** Gets the 99.9th percentile */
double get999thPercentile();
}
/**
* JMX interface for meter metrics tracking rates and counts
*/
public interface JmxMeterMBean extends MetricMBean {
/**
* Gets the current rate
* @return Current rate value
*/
double getRate();
/**
* Gets the total count
* @return Total count value
*/
long getCount();
}The JMX reporter supports the following configuration options:
JMXServerOptions.JMX_SERVER_PORT instead.Metrics are registered under the JMX domain following this pattern:
org.apache.flink.<logical_scope>.<metric_name>For example:
org.apache.flink.jobmanager.numRunningJobsorg.apache.flink.taskmanager.heap.usedorg.apache.flink.job.task.buffers.inputQueueLengthInvalid JMX characters are automatically replaced:
", >, <) are removed_),, =, ;, :, ?, ', *) are replaced with hyphens (-)import org.apache.flink.metrics.jmx.JMXReporterFactory;
import java.util.Properties;
// Create reporter factory
JMXReporterFactory factory = new JMXReporterFactory();
// Configure properties
Properties properties = new Properties();
properties.setProperty(JMXReporterFactory.ARG_PORT, "9999");
// Create reporter instance
JMXReporter reporter = factory.createMetricReporter(properties);
// Open reporter (normally done by Flink)
reporter.open(new MetricConfig());
// Check for JMX server availability
reporter.getPort().ifPresent(port ->
System.out.println("JMX metrics available on port: " + port)
);import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
// Connect to JMX server
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi");
JMXConnector connector = JMXConnectorFactory.connect(url);
MBeanServerConnection connection = connector.getMBeanServerConnection();
// Query Flink metrics
ObjectName pattern = new ObjectName("org.apache.flink.*:*");
Set<ObjectName> metrics = connection.queryNames(pattern, null);
for (ObjectName metric : metrics) {
System.out.println("Metric: " + metric);
// Access metric values through the connection
}
connector.close();# conf/flink-conf.yaml
metrics.reporters: jmx
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789