Ganglia integration module for Apache Spark metrics system
npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl_2-11@2.4.0Apache Spark metrics sink for Ganglia monitoring system integration. This module provides a metrics sink that reports Spark application performance metrics to Ganglia clusters, enabling comprehensive monitoring and visualization of Spark applications through existing Ganglia infrastructure.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-ganglia-lgpl_2.11</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.metrics.sink.GangliaSinkRequired for configuration:
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}
// Set up configuration properties
val properties = new Properties()
properties.setProperty("host", "ganglia.example.com")
properties.setProperty("port", "8649")
properties.setProperty("period", "10")
properties.setProperty("unit", "SECONDS")
properties.setProperty("mode", "MULTICAST")
properties.setProperty("ttl", "1")
// Create metrics registry and security manager
val registry = new MetricRegistry()
val sparkConf = new SparkConf()
val securityMgr = new SecurityManager(sparkConf)
// Create and start the Ganglia sink
val gangliaSink = new GangliaSink(properties, registry, securityMgr)
gangliaSink.start()
// The sink will now automatically report metrics to Ganglia
// Stop when done
gangliaSink.stop()The GangliaSink requires specific configuration properties to connect to a Ganglia cluster:
Core metrics sink functionality for reporting Spark metrics to Ganglia monitoring system.
class GangliaSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends SinkControl the reporting lifecycle of the Ganglia sink.
def start(): Unit
def stop(): Unit
def report(): UnitHelper method for accessing configuration properties safely.
def propertyToOption(prop: String): Option[String]Predefined configuration keys and default values for Ganglia sink setup.
val GANGLIA_KEY_PERIOD: String
val GANGLIA_DEFAULT_PERIOD: Int
val GANGLIA_KEY_UNIT: String
val GANGLIA_DEFAULT_UNIT: TimeUnit
val GANGLIA_KEY_MODE: String
val GANGLIA_DEFAULT_MODE: UDPAddressingMode
val GANGLIA_KEY_TTL: String
val GANGLIA_DEFAULT_TTL: Int
val GANGLIA_KEY_HOST: String
val GANGLIA_KEY_PORT: String
val GANGLIA_KEY_DMAX: String
val GANGLIA_DEFAULT_DMAX: IntConfiguration values extracted from properties during initialization.
val host: String
val port: Int
val ttl: Int
val dmax: Int
val mode: UDPAddressingMode
val pollPeriod: Int
val pollUnit: TimeUnitCore Ganglia integration components managed by the sink.
val ganglia: GMetric
val reporter: GangliaReporterprivate[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
class SecurityManager(sparkConf: SparkConf)
class SparkConf {
def this()
}class MetricRegistry
class GangliaReporter {
def start(period: Long, unit: TimeUnit): Unit
def stop(): Unit
def report(): Unit
}class GMetric(host: String, port: Int, mode: UDPAddressingMode, ttl: Int)
enum UDPAddressingMode {
MULTICAST, UNICAST
}class Properties {
def getProperty(key: String): String
def setProperty(key: String, value: String): Object
}
enum TimeUnit {
NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS
}The GangliaSink throws exceptions during initialization for invalid configuration:
MetricsSystem.checkMinimalPollingPeriod to ensure minimum acceptable reporting intervals