A Flink metrics reporter that exports metrics to Graphite monitoring system via TCP or UDP protocols
npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-graphite@2.1.0Flink Metrics Graphite is a metrics reporter component for Apache Flink that exports metrics to Graphite monitoring systems. It provides seamless integration between Flink's internal metrics collection framework and Graphite's time-series database, supporting both TCP and UDP protocols for metrics transmission.
org.apache.flink:flink-metrics-graphite:2.1.0import org.apache.flink.metrics.graphite.GraphiteReporter;
import org.apache.flink.metrics.graphite.GraphiteReporterFactory;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import com.codahale.metrics.ScheduledReporter;import org.apache.flink.metrics.graphite.GraphiteReporter;
import org.apache.flink.metrics.MetricConfig;
import com.codahale.metrics.ScheduledReporter;
// Configure and create a GraphiteReporter
MetricConfig config = new MetricConfig();
config.setString(GraphiteReporter.ARG_HOST, "localhost");
config.setString(GraphiteReporter.ARG_PORT, "2003");
config.setString(GraphiteReporter.ARG_PROTOCOL, "TCP");
// Create GraphiteReporter and get configured ScheduledReporter
GraphiteReporter reporter = new GraphiteReporter();
ScheduledReporter scheduledReporter = reporter.getReporter(config);
// Or use through Flink's metric system lifecycle
reporter.open(config);
// Metrics will be automatically reported
reporter.close(); // When shutting downThe flink-metrics-graphite package is built on top of Apache Flink's metrics framework:
ScheduledDropwizardReporter to provide Graphite-specific functionalityMetricReporterFactory for service provider interface integrationMetricConfig systemMain reporter class that exports Flink metrics to Graphite monitoring systems.
/**
* GraphiteReporter extends ScheduledDropwizardReporter to provide Graphite integration.
* Supports both TCP and UDP protocols for metrics transmission.
*/
@PublicEvolving
public class GraphiteReporter extends ScheduledDropwizardReporter {
/** Configuration parameter name for protocol selection */
public static final String ARG_PROTOCOL = "protocol";
/** Inherited constants from ScheduledDropwizardReporter */
public static final String ARG_HOST = "host";
public static final String ARG_PORT = "port";
public static final String ARG_PREFIX = "prefix";
public static final String ARG_CONVERSION_RATE = "rateConversion";
public static final String ARG_CONVERSION_DURATION = "durationConversion";
/**
* Creates and configures a Dropwizard GraphiteReporter instance.
* @param config MetricConfig containing connection and formatting options
* @return ScheduledReporter configured for Graphite communication
* @throws IllegalArgumentException if host/port configuration is invalid
*/
@Override
public ScheduledReporter getReporter(MetricConfig config);
/** Inherited methods from ScheduledDropwizardReporter and MetricReporter */
/**
* Opens the reporter with the given configuration.
* @param config MetricConfig containing reporter configuration
*/
@Override
public void open(MetricConfig config);
/**
* Closes the reporter and stops metric reporting.
*/
@Override
public void close();
/**
* Called when a metric is added to the system.
* @param metric The metric instance
* @param metricName The metric name
* @param group The metric group
*/
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
/**
* Called when a metric is removed from the system.
* @param metric The metric instance
* @param metricName The metric name
* @param group The metric group
*/
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
/**
* Reports all registered metrics.
*/
@Override
public void report();
/**
* Filters invalid characters from metric names.
* @param metricName The original metric name
* @return Filtered metric name with invalid characters replaced
*/
@Override
public String filterCharacters(String metricName);
}Usage Example:
import org.apache.flink.metrics.graphite.GraphiteReporter;
import org.apache.flink.metrics.MetricConfig;
import com.codahale.metrics.ScheduledReporter;
// Create reporter with TCP protocol (default)
GraphiteReporter reporter = new GraphiteReporter();
MetricConfig config = new MetricConfig();
config.setString(GraphiteReporter.ARG_HOST, "graphite.example.com");
config.setString(GraphiteReporter.ARG_PORT, "2003");
config.setString(GraphiteReporter.ARG_PROTOCOL, "TCP");
config.setString(GraphiteReporter.ARG_PREFIX, "flink.myapp");
ScheduledReporter scheduledReporter = reporter.getReporter(config);
// Create new config for UDP protocol
MetricConfig udpConfig = new MetricConfig();
udpConfig.setString(GraphiteReporter.ARG_HOST, "graphite.example.com");
udpConfig.setString(GraphiteReporter.ARG_PORT, "2003");
udpConfig.setString(GraphiteReporter.ARG_PROTOCOL, "UDP");
ScheduledReporter udpReporter = reporter.getReporter(udpConfig);Factory class for creating GraphiteReporter instances through Flink's service provider interface.
/**
* MetricReporterFactory implementation for GraphiteReporter.
* Enables automatic discovery and instantiation by Flink's metrics system.
*/
public class GraphiteReporterFactory implements MetricReporterFactory {
/**
* Creates a new GraphiteReporter instance.
* @param properties Configuration properties (currently unused)
* @return New GraphiteReporter instance
*/
@Override
public MetricReporter createMetricReporter(Properties properties);
}Usage Example:
import org.apache.flink.metrics.graphite.GraphiteReporterFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import java.util.Properties;
// Direct factory usage
GraphiteReporterFactory factory = new GraphiteReporterFactory();
Properties props = new Properties();
MetricReporter reporter = factory.createMetricReporter(props);
// Automatic discovery via SPI - configured in Flink configuration
// metrics.reporter.graphite.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactoryThe GraphiteReporter accepts the following configuration parameters through MetricConfig:
/** Configuration parameters inherited from ScheduledDropwizardReporter */
public static final String ARG_HOST = "host"; // Required: Graphite server hostname
public static final String ARG_PORT = "port"; // Required: Graphite server port
public static final String ARG_PREFIX = "prefix"; // Optional: Metric name prefix
public static final String ARG_CONVERSION_RATE = "rateConversion"; // Optional: Rate conversion time unit
public static final String ARG_CONVERSION_DURATION = "durationConversion"; // Optional: Duration conversion time unit
/** GraphiteReporter-specific parameters */
public static final String ARG_PROTOCOL = "protocol"; // Optional: "TCP" or "UDP" (default: "TCP", case-sensitive)Configuration Example:
MetricConfig config = new MetricConfig();
// Required parameters
config.setString(GraphiteReporter.ARG_HOST, "graphite.monitoring.com");
config.setString(GraphiteReporter.ARG_PORT, "2003");
// Optional parameters
config.setString(GraphiteReporter.ARG_PROTOCOL, "UDP"); // Use UDP instead of TCP
config.setString(GraphiteReporter.ARG_PREFIX, "flink.production.app1"); // Add prefix to all metrics
config.setString(GraphiteReporter.ARG_CONVERSION_RATE, "SECONDS"); // Convert rates to per-second
config.setString(GraphiteReporter.ARG_CONVERSION_DURATION, "MILLISECONDS"); // Convert durations to millisecondsThe GraphiteReporter includes validation and error handling:
getReporter() throws IllegalArgumentException if host is null/empty or port < 1Example Error Handling:
try {
MetricConfig config = new MetricConfig();
config.setString(GraphiteReporter.ARG_HOST, ""); // Invalid empty host
config.setString(GraphiteReporter.ARG_PORT, "0"); // Invalid port < 1
GraphiteReporter reporter = new GraphiteReporter();
reporter.getReporter(config); // Throws IllegalArgumentException
} catch (IllegalArgumentException e) {
// Handle configuration error
// Message format: "Invalid host/port configuration. Host: [host] Port: [port]"
System.err.println("Configuration error: " + e.getMessage());
}/** Protocol enumeration for internal use (private to GraphiteReporter class) */
private enum Protocol {
TCP, // TCP socket communication (default)
UDP // UDP datagram communication
}
/** Required imports for complete API usage */
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import java.util.Properties;The package requires the following dependencies:
Provided Dependencies (must be available at runtime):
org.apache.flink:flink-annotationsorg.apache.flink:flink-metrics-coreBundled Dependencies (included in shaded JAR):
org.apache.flink:flink-metrics-dropwizardio.dropwizard.metrics:metrics-coreio.dropwizard.metrics:metrics-graphite