Ganglia metrics reporting integration for Apache Spark monitoring systems
npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl-2-12@3.5.0Spark Ganglia LGPL provides Ganglia monitoring integration for Apache Spark, enabling the collection and reporting of Spark metrics to Ganglia monitoring systems. This package implements a GangliaSink that extends Spark's metrics system to publish performance metrics, job statistics, and runtime information to Ganglia infrastructure for centralized monitoring and alerting.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-ganglia-lgpl_2.12</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>info.ganglia.gmetric4j</groupId>
<artifactId>gmetric4j</artifactId>
<version>1.0.10</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-ganglia-lgpl" % "3.5.6"
libraryDependencies += "info.ganglia.gmetric4j" % "gmetric4j" % "1.0.10"import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import com.codahale.metrics.MetricRegistryFor direct GangliaReporter usage:
import com.codahale.metrics.ganglia.GangliaReporter;
import info.ganglia.gmetric4j.gmetric.GMetric;import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.util.Properties
// Configure Spark to use Ganglia sink via configuration
val conf = new SparkConf()
.setAppName("MySparkApp")
.set("spark.metrics.conf.driver.sink.ganglia.class", "org.apache.spark.metrics.sink.GangliaSink")
.set("spark.metrics.conf.driver.sink.ganglia.host", "ganglia-server.example.com")
.set("spark.metrics.conf.driver.sink.ganglia.port", "8649")
.set("spark.metrics.conf.driver.sink.ganglia.period", "10")
.set("spark.metrics.conf.driver.sink.ganglia.unit", "seconds")
val sc = new SparkContext(conf)
// Metrics will now be automatically reported to GangliaConfiguration file approach (metrics.properties):
# Ganglia sink configuration
driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
driver.sink.ganglia.host=ganglia-server.example.com
driver.sink.ganglia.port=8649
driver.sink.ganglia.period=10
driver.sink.ganglia.unit=seconds
driver.sink.ganglia.mode=multicast
driver.sink.ganglia.ttl=1The Spark Ganglia LGPL integration consists of two main components:
The sink automatically discovers and reports all Spark metrics including JVM metrics, application metrics, and custom user metrics registered with the metrics system.
Core Spark metrics sink implementation that connects Spark's metrics system to Ganglia monitoring infrastructure.
class GangliaSink(
val property: Properties,
val registry: MetricRegistry
) extends Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
def propertyToOption(prop: String): Option[String]
}
trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}Configuration Constants:
// Period configuration
val GANGLIA_KEY_PERIOD: String = "period"
val GANGLIA_DEFAULT_PERIOD: Int = 10
// Time unit configuration
val GANGLIA_KEY_UNIT: String = "unit"
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
// Network mode configuration
val GANGLIA_KEY_MODE: String = "mode"
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
// TTL configuration for multicast
val GANGLIA_KEY_TTL: String = "ttl"
val GANGLIA_DEFAULT_TTL: Int = 1
// Required host and port
val GANGLIA_KEY_HOST: String = "host"
val GANGLIA_KEY_PORT: String = "port"
// Data max configuration
val GANGLIA_KEY_DMAX: String = "dmax"
val GANGLIA_DEFAULT_DMAX: Int = 0Usage Example:
import org.apache.spark.metrics.sink.GangliaSink
import com.codahale.metrics.MetricRegistry
import java.util.Properties
// Manual instantiation (typically done by Spark's metrics system)
val properties = new Properties()
properties.setProperty("host", "ganglia-server.example.com")
properties.setProperty("port", "8649")
properties.setProperty("period", "10")
properties.setProperty("unit", "seconds")
properties.setProperty("mode", "multicast")
properties.setProperty("ttl", "1")
val registry = new MetricRegistry()
val sink = new GangliaSink(properties, registry)
// Start reporting
sink.start()
// Manual report (usually automatic via polling)
sink.report()
// Stop reporting
sink.stop()Low-level Dropwizard Metrics reporter for direct Ganglia integration, supporting comprehensive metric types and flexible configuration.
public class GangliaReporter extends ScheduledReporter {
public static Builder forRegistry(MetricRegistry registry);
// Inherited from ScheduledReporter
public void start(long period, TimeUnit unit);
public void stop();
public void report();
// Core reporting method
public void report(
SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers
);
}
public static class GangliaReporter.Builder {
public Builder prefixedWith(String prefix);
public Builder withTMax(int tMax);
public Builder withDMax(int dMax);
public Builder convertRatesTo(TimeUnit rateUnit);
public Builder convertDurationsTo(TimeUnit durationUnit);
public Builder filter(MetricFilter filter);
public Builder scheduleOn(ScheduledExecutorService executor);
public Builder shutdownExecutorOnStop(boolean shutdownExecutorOnStop);
public Builder disabledMetricAttributes(Set<MetricAttribute> disabledMetricAttributes);
public GangliaReporter build(GMetric gmetric);
public GangliaReporter build(GMetric... gmetrics);
}Usage Example:
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ganglia.GangliaReporter;
import info.ganglia.gmetric4j.gmetric.GMetric;
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode;
import java.util.concurrent.TimeUnit;
// Create Ganglia client
GMetric ganglia = new GMetric(
"ganglia-server.example.com", // host
8649, // port
UDPAddressingMode.MULTICAST, // mode
1 // ttl
);
// Build reporter with configuration
MetricRegistry registry = new MetricRegistry();
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
.prefixedWith("spark.app")
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.withDMax(0)
.build(ganglia);
// Start scheduled reporting every 10 seconds
reporter.start(10, TimeUnit.SECONDS);
// Manual report
reporter.report();
// Stop reporting
reporter.stop();// From info.ganglia.gmetric4j.gmetric package
enum UDPAddressingMode {
MULTICAST,
UNICAST
}
class GMetric {
public GMetric(String host, int port, UDPAddressingMode mode, int ttl);
public void announce(String name, String value, GMetricType type,
String units, GMetricSlope slope, int tMax, int dMax, String group);
}
enum GMetricType {
STRING, INT8, UINT8, INT16, UINT16, INT32, UINT32, FLOAT, DOUBLE
}
enum GMetricSlope {
ZERO, POSITIVE, NEGATIVE, BOTH, UNSPECIFIED
}// From com.codahale.metrics package
class MetricRegistry {
// Standard Dropwizard Metrics registry
}
interface Gauge<T> {
T getValue();
}
class Counter {
long getCount();
void inc();
void inc(long n);
void dec();
void dec(long n);
}
class Histogram {
void update(int value);
void update(long value);
long getCount();
Snapshot getSnapshot();
}
class Meter {
void mark();
void mark(long n);
long getCount();
double getFifteenMinuteRate();
double getFiveMinuteRate();
double getMeanRate();
double getOneMinuteRate();
}
class Timer {
void update(long duration, TimeUnit unit);
Time time();
long getCount();
double getFifteenMinuteRate();
double getFiveMinuteRate();
double getMeanRate();
double getOneMinuteRate();
Snapshot getSnapshot();
}// Standard Java types used in configuration
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.ScheduledExecutorService
import java.util.Set
import com.codahale.metrics.MetricFilter
import com.codahale.metrics.MetricAttributeThe package includes comprehensive error handling:
Configuration Validation:
Exception if required host property is not providedException if required port property is not providedMetricsSystem.checkMinimalPollingPeriodRuntime Error Handling:
GangliaException caught and logged as warnings during metric reportingCommon Configuration Errors:
// Missing required properties
val props = new Properties()
// Missing host/port will throw Exception during GangliaSink construction
// Invalid property values
props.setProperty("ttl", "invalid") // Will cause NumberFormatException
props.setProperty("mode", "INVALID") // Will cause IllegalArgumentException// Configure multiple Ganglia servers for high availability
GMetric ganglia1 = new GMetric("ganglia1.example.com", 8649, UDPAddressingMode.MULTICAST, 1);
GMetric ganglia2 = new GMetric("ganglia2.example.com", 8649, UDPAddressingMode.MULTICAST, 1);
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
.build(ganglia1, ganglia2);// Filter out specific metrics
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
.filter(MetricFilter.contains("jvm")) // Only JVM metrics
.disabledMetricAttributes(Set.of(MetricAttribute.P999, MetricAttribute.P99))
.build(ganglia);# Multicast mode (default)
driver.sink.ganglia.mode=multicast
driver.sink.ganglia.ttl=1
# Unicast mode for complex network topologies
driver.sink.ganglia.mode=unicastThe GangliaSink automatically integrates with Spark's comprehensive metrics system, reporting:
SparkContext.metricRegistryAll metrics are automatically formatted with appropriate Ganglia metadata including metric names, types, units, and groupings.