0
# Spark Ganglia LGPL Integration
1
2
Spark Ganglia LGPL provides Ganglia monitoring integration for Apache Spark, enabling the collection and reporting of Spark metrics to Ganglia monitoring systems. This package implements a GangliaSink that extends Spark's metrics system to publish performance metrics, job statistics, and runtime information to Ganglia infrastructure for centralized monitoring and alerting.
3
4
## Package Information
5
6
- **Package Name**: spark-ganglia-lgpl_2.12
7
- **Package Type**: Maven
8
- **Language**: Scala with Java components
9
- **Group ID**: org.apache.spark
10
- **Installation**: Add to your Maven/SBT dependencies with appropriate Ganglia dependencies
11
12
Maven:
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-ganglia-lgpl_2.12</artifactId>
17
<version>3.5.6</version>
18
</dependency>
19
<dependency>
20
<groupId>info.ganglia.gmetric4j</groupId>
21
<artifactId>gmetric4j</artifactId>
22
<version>1.0.10</version>
23
</dependency>
24
```
25
26
SBT:
27
```scala
28
libraryDependencies += "org.apache.spark" %% "spark-ganglia-lgpl" % "3.5.6"
29
libraryDependencies += "info.ganglia.gmetric4j" % "gmetric4j" % "1.0.10"
30
```
31
32
## Core Imports
33
34
```scala
35
import org.apache.spark.metrics.sink.GangliaSink
36
import java.util.Properties
37
import com.codahale.metrics.MetricRegistry
38
```
39
40
For direct GangliaReporter usage:
41
```java
42
import com.codahale.metrics.ganglia.GangliaReporter;
43
import info.ganglia.gmetric4j.gmetric.GMetric;
44
```
45
46
## Basic Usage
47
48
```scala
49
import org.apache.spark.SparkConf
50
import org.apache.spark.SparkContext
51
import java.util.Properties
52
53
// Configure Spark to use Ganglia sink via configuration
54
val conf = new SparkConf()
55
.setAppName("MySparkApp")
56
.set("spark.metrics.conf.driver.sink.ganglia.class", "org.apache.spark.metrics.sink.GangliaSink")
57
.set("spark.metrics.conf.driver.sink.ganglia.host", "ganglia-server.example.com")
58
.set("spark.metrics.conf.driver.sink.ganglia.port", "8649")
59
.set("spark.metrics.conf.driver.sink.ganglia.period", "10")
60
.set("spark.metrics.conf.driver.sink.ganglia.unit", "seconds")
61
62
val sc = new SparkContext(conf)
63
// Metrics will now be automatically reported to Ganglia
64
```
65
66
Configuration file approach (`metrics.properties`):
67
```properties
68
# Ganglia sink configuration
69
driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
70
driver.sink.ganglia.host=ganglia-server.example.com
71
driver.sink.ganglia.port=8649
72
driver.sink.ganglia.period=10
73
driver.sink.ganglia.unit=seconds
74
driver.sink.ganglia.mode=multicast
75
driver.sink.ganglia.ttl=1
76
```
77
78
## Architecture
79
80
The Spark Ganglia LGPL integration consists of two main components:
81
82
- **GangliaSink**: Spark-specific metrics sink that integrates with Spark's metrics system
83
- **GangliaReporter**: Dropwizard Metrics reporter that handles the actual communication with Ganglia
84
- **Configuration System**: Property-based configuration supporting various network modes and timing options
85
- **Type Mapping**: Automatic conversion between Java/Scala types and Ganglia metric types
86
87
The sink automatically discovers and reports all Spark metrics including JVM metrics, application metrics, and custom user metrics registered with the metrics system.
88
89
## Capabilities
90
91
### Ganglia Sink Integration
92
93
Core Spark metrics sink implementation that connects Spark's metrics system to Ganglia monitoring infrastructure.
94
95
```scala { .api }
96
class GangliaSink(
97
val property: Properties,
98
val registry: MetricRegistry
99
) extends Sink {
100
def start(): Unit
101
def stop(): Unit
102
def report(): Unit
103
def propertyToOption(prop: String): Option[String]
104
}
105
106
trait Sink {
107
def start(): Unit
108
def stop(): Unit
109
def report(): Unit
110
}
111
```
112
113
**Configuration Constants:**
114
115
```scala { .api }
116
// Period configuration
117
val GANGLIA_KEY_PERIOD: String = "period"
118
val GANGLIA_DEFAULT_PERIOD: Int = 10
119
120
// Time unit configuration
121
val GANGLIA_KEY_UNIT: String = "unit"
122
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
123
124
// Network mode configuration
125
val GANGLIA_KEY_MODE: String = "mode"
126
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
127
128
// TTL configuration for multicast
129
val GANGLIA_KEY_TTL: String = "ttl"
130
val GANGLIA_DEFAULT_TTL: Int = 1
131
132
// Required host and port
133
val GANGLIA_KEY_HOST: String = "host"
134
val GANGLIA_KEY_PORT: String = "port"
135
136
// Data max configuration
137
val GANGLIA_KEY_DMAX: String = "dmax"
138
val GANGLIA_DEFAULT_DMAX: Int = 0
139
```
140
141
**Usage Example:**
142
143
```scala
144
import org.apache.spark.metrics.sink.GangliaSink
145
import com.codahale.metrics.MetricRegistry
146
import java.util.Properties
147
148
// Manual instantiation (typically done by Spark's metrics system)
149
val properties = new Properties()
150
properties.setProperty("host", "ganglia-server.example.com")
151
properties.setProperty("port", "8649")
152
properties.setProperty("period", "10")
153
properties.setProperty("unit", "seconds")
154
properties.setProperty("mode", "multicast")
155
properties.setProperty("ttl", "1")
156
157
val registry = new MetricRegistry()
158
val sink = new GangliaSink(properties, registry)
159
160
// Start reporting
161
sink.start()
162
163
// Manual report (usually automatic via polling)
164
sink.report()
165
166
// Stop reporting
167
sink.stop()
168
```
169
170
### Ganglia Reporter
171
172
Low-level Dropwizard Metrics reporter for direct Ganglia integration, supporting comprehensive metric types and flexible configuration.
173
174
```java { .api }
175
public class GangliaReporter extends ScheduledReporter {
176
public static Builder forRegistry(MetricRegistry registry);
177
178
// Inherited from ScheduledReporter
179
public void start(long period, TimeUnit unit);
180
public void stop();
181
public void report();
182
183
// Core reporting method
184
public void report(
185
SortedMap<String, Gauge> gauges,
186
SortedMap<String, Counter> counters,
187
SortedMap<String, Histogram> histograms,
188
SortedMap<String, Meter> meters,
189
SortedMap<String, Timer> timers
190
);
191
}
192
193
public static class GangliaReporter.Builder {
194
public Builder prefixedWith(String prefix);
195
public Builder withTMax(int tMax);
196
public Builder withDMax(int dMax);
197
public Builder convertRatesTo(TimeUnit rateUnit);
198
public Builder convertDurationsTo(TimeUnit durationUnit);
199
public Builder filter(MetricFilter filter);
200
public Builder scheduleOn(ScheduledExecutorService executor);
201
public Builder shutdownExecutorOnStop(boolean shutdownExecutorOnStop);
202
public Builder disabledMetricAttributes(Set<MetricAttribute> disabledMetricAttributes);
203
public GangliaReporter build(GMetric gmetric);
204
public GangliaReporter build(GMetric... gmetrics);
205
}
206
```
207
208
**Usage Example:**
209
210
```java
211
import com.codahale.metrics.MetricRegistry;
212
import com.codahale.metrics.ganglia.GangliaReporter;
213
import info.ganglia.gmetric4j.gmetric.GMetric;
214
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode;
215
import java.util.concurrent.TimeUnit;
216
217
// Create Ganglia client
218
GMetric ganglia = new GMetric(
219
"ganglia-server.example.com", // host
220
8649, // port
221
UDPAddressingMode.MULTICAST, // mode
222
1 // ttl
223
);
224
225
// Build reporter with configuration
226
MetricRegistry registry = new MetricRegistry();
227
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
228
.prefixedWith("spark.app")
229
.convertDurationsTo(TimeUnit.MILLISECONDS)
230
.convertRatesTo(TimeUnit.SECONDS)
231
.withDMax(0)
232
.build(ganglia);
233
234
// Start scheduled reporting every 10 seconds
235
reporter.start(10, TimeUnit.SECONDS);
236
237
// Manual report
238
reporter.report();
239
240
// Stop reporting
241
reporter.stop();
242
```
243
244
## Types
245
246
### Network Configuration Types
247
248
```java { .api }
249
// From info.ganglia.gmetric4j.gmetric package
250
enum UDPAddressingMode {
251
MULTICAST,
252
UNICAST
253
}
254
255
class GMetric {
256
public GMetric(String host, int port, UDPAddressingMode mode, int ttl);
257
public void announce(String name, String value, GMetricType type,
258
String units, GMetricSlope slope, int tMax, int dMax, String group);
259
}
260
261
enum GMetricType {
262
STRING, INT8, UINT8, INT16, UINT16, INT32, UINT32, FLOAT, DOUBLE
263
}
264
265
enum GMetricSlope {
266
ZERO, POSITIVE, NEGATIVE, BOTH, UNSPECIFIED
267
}
268
```
269
270
### Metrics Types
271
272
```java { .api }
273
// From com.codahale.metrics package
274
class MetricRegistry {
275
// Standard Dropwizard Metrics registry
276
}
277
278
interface Gauge<T> {
279
T getValue();
280
}
281
282
class Counter {
283
long getCount();
284
void inc();
285
void inc(long n);
286
void dec();
287
void dec(long n);
288
}
289
290
class Histogram {
291
void update(int value);
292
void update(long value);
293
long getCount();
294
Snapshot getSnapshot();
295
}
296
297
class Meter {
298
void mark();
299
void mark(long n);
300
long getCount();
301
double getFifteenMinuteRate();
302
double getFiveMinuteRate();
303
double getMeanRate();
304
double getOneMinuteRate();
305
}
306
307
class Timer {
308
void update(long duration, TimeUnit unit);
309
Time time();
310
long getCount();
311
double getFifteenMinuteRate();
312
double getFiveMinuteRate();
313
double getMeanRate();
314
double getOneMinuteRate();
315
Snapshot getSnapshot();
316
}
317
```
318
319
### Configuration Types
320
321
```scala { .api }
322
// Standard Java types used in configuration
323
import java.util.Properties
324
import java.util.concurrent.TimeUnit
325
import java.util.concurrent.ScheduledExecutorService
326
import java.util.Set
327
import com.codahale.metrics.MetricFilter
328
import com.codahale.metrics.MetricAttribute
329
```
330
331
## Error Handling
332
333
The package includes comprehensive error handling:
334
335
**Configuration Validation:**
336
- Throws `Exception` if required `host` property is not provided
337
- Throws `Exception` if required `port` property is not provided
338
- Validates polling period using `MetricsSystem.checkMinimalPollingPeriod`
339
340
**Runtime Error Handling:**
341
- `GangliaException` caught and logged as warnings during metric reporting
342
- Graceful degradation when Ganglia server is unavailable
343
- Automatic type detection with fallback to STRING type for unknown objects
344
345
**Common Configuration Errors:**
346
347
```scala
348
// Missing required properties
349
val props = new Properties()
350
// Missing host/port will throw Exception during GangliaSink construction
351
352
// Invalid property values
353
props.setProperty("ttl", "invalid") // Will cause NumberFormatException
354
props.setProperty("mode", "INVALID") // Will cause IllegalArgumentException
355
```
356
357
## Advanced Configuration
358
359
### Multiple Ganglia Servers
360
361
```java
362
// Configure multiple Ganglia servers for high availability
363
GMetric ganglia1 = new GMetric("ganglia1.example.com", 8649, UDPAddressingMode.MULTICAST, 1);
364
GMetric ganglia2 = new GMetric("ganglia2.example.com", 8649, UDPAddressingMode.MULTICAST, 1);
365
366
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
367
.build(ganglia1, ganglia2);
368
```
369
370
### Custom Metric Filtering
371
372
```java
373
// Filter out specific metrics
374
GangliaReporter reporter = GangliaReporter.forRegistry(registry)
375
.filter(MetricFilter.contains("jvm")) // Only JVM metrics
376
.disabledMetricAttributes(Set.of(MetricAttribute.P999, MetricAttribute.P99))
377
.build(ganglia);
378
```
379
380
### Network Mode Configuration
381
382
```properties
383
# Multicast mode (default)
384
driver.sink.ganglia.mode=multicast
385
driver.sink.ganglia.ttl=1
386
387
# Unicast mode for complex network topologies
388
driver.sink.ganglia.mode=unicast
389
```
390
391
## Integration with Spark Metrics System
392
393
The GangliaSink automatically integrates with Spark's comprehensive metrics system, reporting:
394
395
- **Application Metrics**: Job execution times, task counts, stage durations
396
- **JVM Metrics**: Heap usage, GC times, thread counts
397
- **System Metrics**: CPU usage, disk I/O, network I/O
398
- **Custom Metrics**: User-registered metrics via `SparkContext.metricRegistry`
399
400
All metrics are automatically formatted with appropriate Ganglia metadata including metric names, types, units, and groupings.