tessl install tessl/maven-org-apache-spark--spark-ganglia-lgpl_2-11@2.2.0Ganglia metrics sink integration for Apache Spark that enables publishing Spark application metrics to Ganglia monitoring systems.
Ganglia metrics sink integration for Apache Spark that enables publishing Spark application metrics to Ganglia monitoring systems. This sink allows Spark applications to send their internal metrics data to Ganglia nodes or multicast groups for centralized monitoring and visualization.
pom.xml or build.sbt with LGPL licensing considerationorg.apache.spark:spark-ganglia-lgpl_2.11:2.2.3Dependencies:
org.apache.spark:spark-core_2.11:2.2.3 (automatically included)io.dropwizard.metrics:metrics-ganglia (for GangliaReporter)info.ganglia.gmetric4j:gmetric4j (transitive, for GMetric and UDPAddressingMode)import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}Configure the Ganglia sink through Spark's metrics system using a properties file:
// In metrics.properties configuration file
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
*.sink.ganglia.host=ganglia.example.com
*.sink.ganglia.port=8649
*.sink.ganglia.period=30
*.sink.ganglia.unit=seconds
*.sink.ganglia.mode=multicast
*.sink.ganglia.ttl=1Programmatic instantiation (typically done by Spark's metrics system):
import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}
// Configuration properties
val properties = new Properties()
properties.setProperty("host", "ganglia.example.com")
properties.setProperty("port", "8649")
properties.setProperty("period", "30")
properties.setProperty("unit", "seconds")
// Create sink instance (normally done by Spark)
val registry = new MetricRegistry()
val securityMgr = new SecurityManager(new SparkConf())
val sink = new GangliaSink(properties, registry, securityMgr)
// Lifecycle management
sink.start() // Begin reporting metrics
sink.report() // Force immediate report
sink.stop() // Stop reportingThe GangliaSink integrates with Apache Spark's metrics system by implementing the Sink trait. It uses the Dropwizard Metrics library's GangliaReporter internally to handle the actual communication with Ganglia systems. The sink supports both UDP unicast and multicast modes for flexibility in different network environments.
Main metrics sink implementation that sends Spark metrics to Ganglia monitoring systems via UDP.
class GangliaSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends Sink {
/** Start the periodic reporting of metrics to Ganglia */
def start(): Unit
/** Stop the metrics reporting and cleanup resources */
def stop(): Unit
/** Immediately report current metrics to Ganglia */
def report(): Unit
/** Utility method to safely extract configuration properties */
def propertyToOption(prop: String): Option[String]
}Constructor Parameters:
property - Configuration properties containing Ganglia connection detailsregistry - Dropwizard MetricRegistry containing metrics to reportsecurityMgr - Spark SecurityManager instance (not used in current implementation)Configuration Properties:
The GangliaSink accepts the following configuration keys via the Properties object:
"host" - Required - Hostname or multicast group of the Ganglia server"port" - Required - Port number of the Ganglia server(s)"period" - Optional - Reporting interval (default: 10)"unit" - Optional - Time unit for the period (default: "SECONDS")"ttl" - Optional - TTL for multicast messages (default: 1)"dmax" - Optional - Lifetime in seconds of metrics, 0 = never expired (default: 0)"mode" - Optional - Network mode: "unicast" or "multicast" (default: "multicast")Configuration Constants:
The GangliaSink class exposes the following public constants for configuration key names and default values:
// Configuration property keys
val GANGLIA_KEY_PERIOD: String = "period"
val GANGLIA_KEY_UNIT: String = "unit"
val GANGLIA_KEY_MODE: String = "mode"
val GANGLIA_KEY_TTL: String = "ttl"
val GANGLIA_KEY_HOST: String = "host"
val GANGLIA_KEY_PORT: String = "port"
val GANGLIA_KEY_DMAX: String = "dmax"
// Default configuration values
val GANGLIA_DEFAULT_PERIOD: Int = 10
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
val GANGLIA_DEFAULT_TTL: Int = 1
val GANGLIA_DEFAULT_DMAX: Int = 0import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.ganglia.GangliaReporter
import info.ganglia.gmetric4j.gmetric.GMetric
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.sink.Sink
// Base Spark sink interface that GangliaSink implements
trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
// External Java enum from gmetric4j library
// Used by GangliaSink for network mode configuration
enum UDPAddressingMode {
MULTICAST,
UNICAST
}
// Actual class: info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingModeThe GangliaSink constructor validates required configuration and throws exceptions for missing required properties:
Exception("Ganglia sink requires 'host' property.")Exception("Ganglia sink requires 'port' property.")MetricsSystem.checkMinimalPollingPeriod() which may throw exceptions for periods that are too short# Enable Ganglia sink for all Spark instances
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
*.sink.ganglia.host=239.2.11.71
*.sink.ganglia.port=8649
*.sink.ganglia.mode=multicast
*.sink.ganglia.ttl=1
*.sink.ganglia.period=10
*.sink.ganglia.unit=seconds# Direct unicast to specific Ganglia server
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
*.sink.ganglia.host=ganglia-server.example.com
*.sink.ganglia.port=8649
*.sink.ganglia.mode=unicast
*.sink.ganglia.period=30
*.sink.ganglia.unit=seconds
*.sink.ganglia.dmax=3600# Enable only for Spark driver
driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
driver.sink.ganglia.host=ganglia.example.com
driver.sink.ganglia.port=8649
driver.sink.ganglia.period=15
driver.sink.ganglia.unit=seconds