Ganglia metrics sink integration for Apache Spark enabling metrics reporting to Ganglia monitoring systems
npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl_2-10@1.6.0Spark Ganglia LGPL provides a Ganglia metrics sink integration for Apache Spark, enabling Spark applications to report performance and operational metrics to Ganglia monitoring systems. This module uses the Ganglia GMetric protocol to send metrics data via UDP with support for both multicast and unicast addressing modes.
pom.xml<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-ganglia-lgpl_2.10</artifactId>
<version>1.6.3</version>
</dependency>import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}
// Configure properties for Ganglia connection
val properties = new Properties()
properties.setProperty("host", "ganglia.example.com")
properties.setProperty("port", "8649")
properties.setProperty("period", "30")
properties.setProperty("unit", "SECONDS")
properties.setProperty("mode", "MULTICAST")
properties.setProperty("ttl", "2")
// Create metrics registry and security manager
val registry = new MetricRegistry()
val sparkConf = new SparkConf() // Assuming SparkConf is available
val securityMgr = new SecurityManager(sparkConf)
// Initialize Ganglia sink
val gangliaSink = new GangliaSink(properties, registry, securityMgr)
// Start metrics reporting
gangliaSink.start()
// Report metrics immediately (optional)
gangliaSink.report()
// Stop metrics reporting when done
gangliaSink.stop()Main class that implements the Spark metrics sink interface for reporting metrics to Ganglia.
/**
* Ganglia metrics sink that reports Spark metrics to Ganglia monitoring systems
* @param property Configuration properties for Ganglia connection
* @param registry Codahale MetricRegistry containing metrics to report
* @param securityMgr Spark SecurityManager instance
*/
class GangliaSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends Sink {
/**
* Internal helper method to convert properties to Options
* @param prop Property key to retrieve
* @return Option containing property value or None if not found
*/
private def propertyToOption(prop: String): Option[String]
}Starts the Ganglia reporter with the configured polling period and time unit.
/**
* Start the Ganglia metrics reporter
* Begins periodic reporting based on configured period and unit
*/
override def start(): UnitStops the Ganglia reporter and ceases all metrics reporting.
/**
* Stop the Ganglia metrics reporter
* Immediately stops all metrics reporting to Ganglia
*/
override def stop(): UnitTriggers an immediate report of all current metrics to Ganglia, independent of the configured polling schedule.
/**
* Report all current metrics immediately to Ganglia
* Does not affect the regular polling schedule
*/
override def report(): UnitThese properties must be provided in the Properties object passed to the constructor:
// Required configuration keys
val GANGLIA_KEY_HOST = "host" // Ganglia server hostname
val GANGLIA_KEY_PORT = "port" // Ganglia server port numberExample:
properties.setProperty("host", "ganglia.example.com")
properties.setProperty("port", "8649")These properties have default values if not specified:
// Optional configuration keys with defaults
val GANGLIA_KEY_PERIOD = "period" // Reporting period (default: 10)
val GANGLIA_KEY_UNIT = "unit" // Time unit (default: SECONDS)
val GANGLIA_KEY_MODE = "mode" // UDP mode (default: MULTICAST)
val GANGLIA_KEY_TTL = "ttl" // Multicast TTL (default: 1)Default Values:
val GANGLIA_DEFAULT_PERIOD = 10
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
val GANGLIA_DEFAULT_TTL = 1Configuration Examples:
// Reporting every 30 seconds
properties.setProperty("period", "30")
properties.setProperty("unit", "SECONDS")
// Using unicast mode instead of multicast
properties.setProperty("mode", "UNICAST")
// Setting TTL for multicast (useful for multi-hop networks)
properties.setProperty("ttl", "3")Time Units: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS
UDP Addressing Modes: MULTICAST, UNICAST
// Java standard library
import java.util.Properties
import java.util.concurrent.TimeUnit
// Codahale Metrics (Dropwizard Metrics)
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.ganglia.GangliaReporter
// Ganglia GMetric library
import info.ganglia.gmetric4j.gmetric.GMetric
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
// Spark framework
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.sink.SinkBase trait that GangliaSink implements, defining the standard metrics sink interface:
private[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}The GangliaSink constructor validates required configuration in this order and throws exceptions for missing or invalid values:
Exception("Ganglia sink requires 'host' property.")Exception("Ganglia sink requires 'port' property.")NumberFormatException if port cannot be parsed as integerNumberFormatException for invalid period or ttl valuesIllegalArgumentException for invalid mode or unit valuesMetricsSystem.checkMinimalPollingPeriod()Validation occurs during object construction before the reporter is created.
Example error handling:
try {
val gangliaSink = new GangliaSink(properties, registry, securityMgr)
gangliaSink.start()
} catch {
case e: Exception =>
println(s"Failed to initialize Ganglia sink: ${e.getMessage}")
case e: NumberFormatException =>
println(s"Invalid numeric configuration: ${e.getMessage}")
case e: IllegalArgumentException =>
println(s"Invalid configuration value: ${e.getMessage}")
}This sink integrates with Spark's metrics system through configuration. Add to your metrics.properties file:
# Enable Ganglia sink
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
*.sink.ganglia.host=ganglia.example.com
*.sink.ganglia.port=8649
*.sink.ganglia.period=30
*.sink.ganglia.unit=seconds
*.sink.ganglia.mode=multicast
*.sink.ganglia.ttl=2The sink automatically receives and reports all metrics registered with Spark's MetricRegistry, including:
This module is distributed separately from the main Spark distribution due to its dependency on LGPL-licensed Ganglia components (specifically the gmetric4j library). While Spark itself is Apache 2.0 licensed, the Ganglia integration must include LGPL dependencies, requiring separate distribution to maintain license compatibility.