Ganglia integration module for Apache Spark metrics system
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Spark metrics sink for Ganglia monitoring system integration. This module provides a metrics sink that reports Spark application performance metrics to Ganglia clusters, enabling comprehensive monitoring and visualization of Spark applications through existing Ganglia infrastructure.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-ganglia-lgpl_2.11</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.metrics.sink.GangliaSinkRequired for configuration:
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SecurityManager, SparkConf}
// Set up configuration properties
val properties = new Properties()
properties.setProperty("host", "ganglia.example.com")
properties.setProperty("port", "8649")
properties.setProperty("period", "10")
properties.setProperty("unit", "SECONDS")
properties.setProperty("mode", "MULTICAST")
properties.setProperty("ttl", "1")
// Create metrics registry and security manager
val registry = new MetricRegistry()
val sparkConf = new SparkConf()
val securityMgr = new SecurityManager(sparkConf)
// Create and start the Ganglia sink
val gangliaSink = new GangliaSink(properties, registry, securityMgr)
gangliaSink.start()
// The sink will now automatically report metrics to Ganglia
// Stop when done
gangliaSink.stop()The GangliaSink requires specific configuration properties to connect to a Ganglia cluster:
Core metrics sink functionality for reporting Spark metrics to Ganglia monitoring system.
class GangliaSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends SinkControl the reporting lifecycle of the Ganglia sink.
def start(): Unit
def stop(): Unit
def report(): UnitHelper method for accessing configuration properties safely.
def propertyToOption(prop: String): Option[String]Predefined configuration keys and default values for Ganglia sink setup.
val GANGLIA_KEY_PERIOD: String
val GANGLIA_DEFAULT_PERIOD: Int
val GANGLIA_KEY_UNIT: String
val GANGLIA_DEFAULT_UNIT: TimeUnit
val GANGLIA_KEY_MODE: String
val GANGLIA_DEFAULT_MODE: UDPAddressingMode
val GANGLIA_KEY_TTL: String
val GANGLIA_DEFAULT_TTL: Int
val GANGLIA_KEY_HOST: String
val GANGLIA_KEY_PORT: String
val GANGLIA_KEY_DMAX: String
val GANGLIA_DEFAULT_DMAX: IntConfiguration values extracted from properties during initialization.
val host: String
val port: Int
val ttl: Int
val dmax: Int
val mode: UDPAddressingMode
val pollPeriod: Int
val pollUnit: TimeUnitCore Ganglia integration components managed by the sink.
val ganglia: GMetric
val reporter: GangliaReporterprivate[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
class SecurityManager(sparkConf: SparkConf)
class SparkConf {
def this()
}class MetricRegistry
class GangliaReporter {
def start(period: Long, unit: TimeUnit): Unit
def stop(): Unit
def report(): Unit
}class GMetric(host: String, port: Int, mode: UDPAddressingMode, ttl: Int)
enum UDPAddressingMode {
MULTICAST, UNICAST
}class Properties {
def getProperty(key: String): String
def setProperty(key: String, value: String): Object
}
enum TimeUnit {
NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS
}The GangliaSink throws exceptions during initialization for invalid configuration:
MetricsSystem.checkMinimalPollingPeriod to ensure minimum acceptable reporting intervals