or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-ganglia-lgpl-2-12

Ganglia metrics reporting integration for Apache Spark monitoring systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-ganglia-lgpl_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl-2-12@3.5.0

index.mddocs/

Spark Ganglia LGPL Integration

Spark Ganglia LGPL provides Ganglia monitoring integration for Apache Spark, enabling the collection and reporting of Spark metrics to Ganglia monitoring systems. This package implements a GangliaSink that extends Spark's metrics system to publish performance metrics, job statistics, and runtime information to Ganglia infrastructure for centralized monitoring and alerting.

Package Information

  • Package Name: spark-ganglia-lgpl_2.12
  • Package Type: Maven
  • Language: Scala with Java components
  • Group ID: org.apache.spark
  • Installation: Add to your Maven/SBT dependencies with appropriate Ganglia dependencies

Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-ganglia-lgpl_2.12</artifactId>
  <version>3.5.6</version>
</dependency>
<dependency>
  <groupId>info.ganglia.gmetric4j</groupId>
  <artifactId>gmetric4j</artifactId>
  <version>1.0.10</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" %% "spark-ganglia-lgpl" % "3.5.6"
libraryDependencies += "info.ganglia.gmetric4j" % "gmetric4j" % "1.0.10"

Core Imports

import org.apache.spark.metrics.sink.GangliaSink
import java.util.Properties
import com.codahale.metrics.MetricRegistry

For direct GangliaReporter usage:

import com.codahale.metrics.ganglia.GangliaReporter;
import info.ganglia.gmetric4j.gmetric.GMetric;

Basic Usage

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.util.Properties

// Configure Spark to use Ganglia sink via configuration
val conf = new SparkConf()
  .setAppName("MySparkApp")
  .set("spark.metrics.conf.driver.sink.ganglia.class", "org.apache.spark.metrics.sink.GangliaSink")
  .set("spark.metrics.conf.driver.sink.ganglia.host", "ganglia-server.example.com")
  .set("spark.metrics.conf.driver.sink.ganglia.port", "8649")
  .set("spark.metrics.conf.driver.sink.ganglia.period", "10")
  .set("spark.metrics.conf.driver.sink.ganglia.unit", "seconds")

val sc = new SparkContext(conf)
// Metrics will now be automatically reported to Ganglia

Configuration file approach (metrics.properties):

# Ganglia sink configuration
driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
driver.sink.ganglia.host=ganglia-server.example.com
driver.sink.ganglia.port=8649
driver.sink.ganglia.period=10
driver.sink.ganglia.unit=seconds
driver.sink.ganglia.mode=multicast
driver.sink.ganglia.ttl=1

Architecture

The Spark Ganglia LGPL integration consists of two main components:

  • GangliaSink: Spark-specific metrics sink that integrates with Spark's metrics system
  • GangliaReporter: Dropwizard Metrics reporter that handles the actual communication with Ganglia
  • Configuration System: Property-based configuration supporting various network modes and timing options
  • Type Mapping: Automatic conversion between Java/Scala types and Ganglia metric types

The sink automatically discovers and reports all Spark metrics including JVM metrics, application metrics, and custom user metrics registered with the metrics system.

Capabilities

Ganglia Sink Integration

Core Spark metrics sink implementation that connects Spark's metrics system to Ganglia monitoring infrastructure.

class GangliaSink(
  val property: Properties, 
  val registry: MetricRegistry
) extends Sink {
  def start(): Unit
  def stop(): Unit  
  def report(): Unit
  def propertyToOption(prop: String): Option[String]
}

trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

Configuration Constants:

// Period configuration
val GANGLIA_KEY_PERIOD: String = "period"
val GANGLIA_DEFAULT_PERIOD: Int = 10

// Time unit configuration  
val GANGLIA_KEY_UNIT: String = "unit"
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS

// Network mode configuration
val GANGLIA_KEY_MODE: String = "mode"
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST

// TTL configuration for multicast
val GANGLIA_KEY_TTL: String = "ttl"
val GANGLIA_DEFAULT_TTL: Int = 1

// Required host and port
val GANGLIA_KEY_HOST: String = "host"
val GANGLIA_KEY_PORT: String = "port"

// Data max configuration
val GANGLIA_KEY_DMAX: String = "dmax"
val GANGLIA_DEFAULT_DMAX: Int = 0

Usage Example:

import org.apache.spark.metrics.sink.GangliaSink
import com.codahale.metrics.MetricRegistry
import java.util.Properties

// Manual instantiation (typically done by Spark's metrics system)
val properties = new Properties()
properties.setProperty("host", "ganglia-server.example.com")
properties.setProperty("port", "8649")
properties.setProperty("period", "10")
properties.setProperty("unit", "seconds")
properties.setProperty("mode", "multicast")
properties.setProperty("ttl", "1")

val registry = new MetricRegistry()
val sink = new GangliaSink(properties, registry)

// Start reporting
sink.start()

// Manual report (usually automatic via polling)
sink.report()

// Stop reporting
sink.stop()

Ganglia Reporter

Low-level Dropwizard Metrics reporter for direct Ganglia integration, supporting comprehensive metric types and flexible configuration.

public class GangliaReporter extends ScheduledReporter {
  public static Builder forRegistry(MetricRegistry registry);
  
  // Inherited from ScheduledReporter
  public void start(long period, TimeUnit unit);
  public void stop();
  public void report();
  
  // Core reporting method
  public void report(
    SortedMap<String, Gauge> gauges,
    SortedMap<String, Counter> counters, 
    SortedMap<String, Histogram> histograms,
    SortedMap<String, Meter> meters,
    SortedMap<String, Timer> timers
  );
}

public static class GangliaReporter.Builder {
  public Builder prefixedWith(String prefix);
  public Builder withTMax(int tMax);
  public Builder withDMax(int dMax);
  public Builder convertRatesTo(TimeUnit rateUnit);
  public Builder convertDurationsTo(TimeUnit durationUnit);
  public Builder filter(MetricFilter filter);
  public Builder scheduleOn(ScheduledExecutorService executor);
  public Builder shutdownExecutorOnStop(boolean shutdownExecutorOnStop);
  public Builder disabledMetricAttributes(Set<MetricAttribute> disabledMetricAttributes);
  public GangliaReporter build(GMetric gmetric);
  public GangliaReporter build(GMetric... gmetrics);
}

Usage Example:

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ganglia.GangliaReporter;
import info.ganglia.gmetric4j.gmetric.GMetric;
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode;
import java.util.concurrent.TimeUnit;

// Create Ganglia client
GMetric ganglia = new GMetric(
  "ganglia-server.example.com",  // host
  8649,                          // port  
  UDPAddressingMode.MULTICAST,   // mode
  1                              // ttl
);

// Build reporter with configuration
MetricRegistry registry = new MetricRegistry();
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
  .prefixedWith("spark.app")
  .convertDurationsTo(TimeUnit.MILLISECONDS)
  .convertRatesTo(TimeUnit.SECONDS)
  .withDMax(0)
  .build(ganglia);

// Start scheduled reporting every 10 seconds
reporter.start(10, TimeUnit.SECONDS);

// Manual report
reporter.report();

// Stop reporting
reporter.stop();

Types

Network Configuration Types

// From info.ganglia.gmetric4j.gmetric package
enum UDPAddressingMode {
  MULTICAST,
  UNICAST
}

class GMetric {
  public GMetric(String host, int port, UDPAddressingMode mode, int ttl);
  public void announce(String name, String value, GMetricType type, 
                      String units, GMetricSlope slope, int tMax, int dMax, String group);
}

enum GMetricType {
  STRING, INT8, UINT8, INT16, UINT16, INT32, UINT32, FLOAT, DOUBLE
}

enum GMetricSlope {
  ZERO, POSITIVE, NEGATIVE, BOTH, UNSPECIFIED
}

Metrics Types

// From com.codahale.metrics package
class MetricRegistry {
  // Standard Dropwizard Metrics registry
}

interface Gauge<T> {
  T getValue();
}

class Counter {
  long getCount();
  void inc();
  void inc(long n);
  void dec();
  void dec(long n);
}

class Histogram {
  void update(int value);
  void update(long value);
  long getCount();
  Snapshot getSnapshot();
}

class Meter {
  void mark();
  void mark(long n);
  long getCount();
  double getFifteenMinuteRate();
  double getFiveMinuteRate();
  double getMeanRate();
  double getOneMinuteRate();
}

class Timer {
  void update(long duration, TimeUnit unit);
  Time time();
  long getCount();
  double getFifteenMinuteRate();
  double getFiveMinuteRate();
  double getMeanRate();
  double getOneMinuteRate();
  Snapshot getSnapshot();
}

Configuration Types

// Standard Java types used in configuration
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.ScheduledExecutorService
import java.util.Set
import com.codahale.metrics.MetricFilter
import com.codahale.metrics.MetricAttribute

Error Handling

The package includes comprehensive error handling:

Configuration Validation:

  • Throws Exception if required host property is not provided
  • Throws Exception if required port property is not provided
  • Validates polling period using MetricsSystem.checkMinimalPollingPeriod

Runtime Error Handling:

  • GangliaException caught and logged as warnings during metric reporting
  • Graceful degradation when Ganglia server is unavailable
  • Automatic type detection with fallback to STRING type for unknown objects

Common Configuration Errors:

// Missing required properties
val props = new Properties()
// Missing host/port will throw Exception during GangliaSink construction

// Invalid property values  
props.setProperty("ttl", "invalid")  // Will cause NumberFormatException
props.setProperty("mode", "INVALID") // Will cause IllegalArgumentException

Advanced Configuration

Multiple Ganglia Servers

// Configure multiple Ganglia servers for high availability
GMetric ganglia1 = new GMetric("ganglia1.example.com", 8649, UDPAddressingMode.MULTICAST, 1);
GMetric ganglia2 = new GMetric("ganglia2.example.com", 8649, UDPAddressingMode.MULTICAST, 1);

GangliaReporter reporter = GangliaReporter.forRegistry(registry)
  .build(ganglia1, ganglia2);

Custom Metric Filtering

// Filter out specific metrics
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
  .filter(MetricFilter.contains("jvm"))  // Only JVM metrics
  .disabledMetricAttributes(Set.of(MetricAttribute.P999, MetricAttribute.P99))
  .build(ganglia);

Network Mode Configuration

# Multicast mode (default)
driver.sink.ganglia.mode=multicast
driver.sink.ganglia.ttl=1

# Unicast mode for complex network topologies
driver.sink.ganglia.mode=unicast

Integration with Spark Metrics System

The GangliaSink automatically integrates with Spark's comprehensive metrics system, reporting:

  • Application Metrics: Job execution times, task counts, stage durations
  • JVM Metrics: Heap usage, GC times, thread counts
  • System Metrics: CPU usage, disk I/O, network I/O
  • Custom Metrics: User-registered metrics via SparkContext.metricRegistry

All metrics are automatically formatted with appropriate Ganglia metadata including metric names, types, units, and groupings.