Ganglia metrics sink integration for Apache Spark enabling metrics reporting to Ganglia monitoring systems
npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl_2-10@1.6.00
# Spark Ganglia LGPL
1
2
Spark Ganglia LGPL provides a Ganglia metrics sink integration for Apache Spark, enabling Spark applications to report performance and operational metrics to Ganglia monitoring systems. This module uses the Ganglia GMetric protocol to send metrics data via UDP with support for both multicast and unicast addressing modes.
3
4
## Package Information
5
6
- **Package Name**: spark-ganglia-lgpl_2.10
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-ganglia-lgpl_2.10
11
- **Installation**: Add Maven dependency in `pom.xml`
12
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-ganglia-lgpl_2.10</artifactId>
17
<version>1.6.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```scala
24
import org.apache.spark.metrics.sink.GangliaSink
25
import java.util.Properties
26
import java.util.concurrent.TimeUnit
27
import com.codahale.metrics.MetricRegistry
28
import org.apache.spark.{SecurityManager, SparkConf}
29
```
30
31
## Basic Usage
32
33
```scala
34
import org.apache.spark.metrics.sink.GangliaSink
35
import java.util.Properties
36
import java.util.concurrent.TimeUnit
37
import com.codahale.metrics.MetricRegistry
38
import org.apache.spark.{SecurityManager, SparkConf}
39
40
// Configure properties for Ganglia connection
41
val properties = new Properties()
42
properties.setProperty("host", "ganglia.example.com")
43
properties.setProperty("port", "8649")
44
properties.setProperty("period", "30")
45
properties.setProperty("unit", "SECONDS")
46
properties.setProperty("mode", "MULTICAST")
47
properties.setProperty("ttl", "2")
48
49
// Create metrics registry and security manager
50
val registry = new MetricRegistry()
51
val sparkConf = new SparkConf() // Assuming SparkConf is available
52
val securityMgr = new SecurityManager(sparkConf)
53
54
// Initialize Ganglia sink
55
val gangliaSink = new GangliaSink(properties, registry, securityMgr)
56
57
// Start metrics reporting
58
gangliaSink.start()
59
60
// Report metrics immediately (optional)
61
gangliaSink.report()
62
63
// Stop metrics reporting when done
64
gangliaSink.stop()
65
```
66
67
## Capabilities
68
69
### GangliaSink Class
70
71
Main class that implements the Spark metrics sink interface for reporting metrics to Ganglia.
72
73
```scala { .api }
74
/**
75
* Ganglia metrics sink that reports Spark metrics to Ganglia monitoring systems
76
* @param property Configuration properties for Ganglia connection
77
* @param registry Codahale MetricRegistry containing metrics to report
78
* @param securityMgr Spark SecurityManager instance
79
*/
80
class GangliaSink(
81
val property: Properties,
82
val registry: MetricRegistry,
83
securityMgr: SecurityManager
84
) extends Sink {
85
86
/**
87
* Internal helper method to convert properties to Options
88
* @param prop Property key to retrieve
89
* @return Option containing property value or None if not found
90
*/
91
private def propertyToOption(prop: String): Option[String]
92
}
93
```
94
95
### Start Metrics Reporting
96
97
Starts the Ganglia reporter with the configured polling period and time unit.
98
99
```scala { .api }
100
/**
101
* Start the Ganglia metrics reporter
102
* Begins periodic reporting based on configured period and unit
103
*/
104
override def start(): Unit
105
```
106
107
### Stop Metrics Reporting
108
109
Stops the Ganglia reporter and ceases all metrics reporting.
110
111
```scala { .api }
112
/**
113
* Stop the Ganglia metrics reporter
114
* Immediately stops all metrics reporting to Ganglia
115
*/
116
override def stop(): Unit
117
```
118
119
### Report Metrics Immediately
120
121
Triggers an immediate report of all current metrics to Ganglia, independent of the configured polling schedule.
122
123
```scala { .api }
124
/**
125
* Report all current metrics immediately to Ganglia
126
* Does not affect the regular polling schedule
127
*/
128
override def report(): Unit
129
```
130
131
## Configuration
132
133
### Required Properties
134
135
These properties must be provided in the Properties object passed to the constructor:
136
137
```scala { .api }
138
// Required configuration keys
139
val GANGLIA_KEY_HOST = "host" // Ganglia server hostname
140
val GANGLIA_KEY_PORT = "port" // Ganglia server port number
141
```
142
143
**Example:**
144
```scala
145
properties.setProperty("host", "ganglia.example.com")
146
properties.setProperty("port", "8649")
147
```
148
149
### Optional Properties
150
151
These properties have default values if not specified:
152
153
```scala { .api }
154
// Optional configuration keys with defaults
155
val GANGLIA_KEY_PERIOD = "period" // Reporting period (default: 10)
156
val GANGLIA_KEY_UNIT = "unit" // Time unit (default: SECONDS)
157
val GANGLIA_KEY_MODE = "mode" // UDP mode (default: MULTICAST)
158
val GANGLIA_KEY_TTL = "ttl" // Multicast TTL (default: 1)
159
```
160
161
**Default Values:**
162
```scala { .api }
163
val GANGLIA_DEFAULT_PERIOD = 10
164
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
165
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
166
val GANGLIA_DEFAULT_TTL = 1
167
```
168
169
**Configuration Examples:**
170
```scala
171
// Reporting every 30 seconds
172
properties.setProperty("period", "30")
173
properties.setProperty("unit", "SECONDS")
174
175
// Using unicast mode instead of multicast
176
properties.setProperty("mode", "UNICAST")
177
178
// Setting TTL for multicast (useful for multi-hop networks)
179
properties.setProperty("ttl", "3")
180
```
181
182
### Valid Configuration Values
183
184
**Time Units:** `NANOSECONDS`, `MICROSECONDS`, `MILLISECONDS`, `SECONDS`, `MINUTES`, `HOURS`, `DAYS`
185
186
**UDP Addressing Modes:** `MULTICAST`, `UNICAST`
187
188
## Types
189
190
### Dependencies and Imports
191
192
```scala { .api }
193
// Java standard library
194
import java.util.Properties
195
import java.util.concurrent.TimeUnit
196
197
// Codahale Metrics (Dropwizard Metrics)
198
import com.codahale.metrics.MetricRegistry
199
import com.codahale.metrics.ganglia.GangliaReporter
200
201
// Ganglia GMetric library
202
import info.ganglia.gmetric4j.gmetric.GMetric
203
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
204
205
// Spark framework
206
import org.apache.spark.SecurityManager
207
import org.apache.spark.metrics.MetricsSystem
208
import org.apache.spark.metrics.sink.Sink
209
```
210
211
### Sink Trait
212
213
Base trait that GangliaSink implements, defining the standard metrics sink interface:
214
215
```scala { .api }
216
private[spark] trait Sink {
217
def start(): Unit
218
def stop(): Unit
219
def report(): Unit
220
}
221
```
222
223
## Error Handling
224
225
The GangliaSink constructor validates required configuration in this order and throws exceptions for missing or invalid values:
226
227
1. **Missing host**: Throws `Exception("Ganglia sink requires 'host' property.")`
228
2. **Missing port**: Throws `Exception("Ganglia sink requires 'port' property.")`
229
3. **Invalid port**: Throws `NumberFormatException` if port cannot be parsed as integer
230
4. **Invalid numeric values**: Throws `NumberFormatException` for invalid period or ttl values
231
5. **Invalid enum values**: Throws `IllegalArgumentException` for invalid mode or unit values
232
6. **Invalid polling period**: May throw exception from `MetricsSystem.checkMinimalPollingPeriod()`
233
234
Validation occurs during object construction before the reporter is created.
235
236
**Example error handling:**
237
```scala
238
try {
239
val gangliaSink = new GangliaSink(properties, registry, securityMgr)
240
gangliaSink.start()
241
} catch {
242
case e: Exception =>
243
println(s"Failed to initialize Ganglia sink: ${e.getMessage}")
244
case e: NumberFormatException =>
245
println(s"Invalid numeric configuration: ${e.getMessage}")
246
case e: IllegalArgumentException =>
247
println(s"Invalid configuration value: ${e.getMessage}")
248
}
249
```
250
251
## Integration with Spark Metrics
252
253
This sink integrates with Spark's metrics system through configuration. Add to your `metrics.properties` file:
254
255
```properties
256
# Enable Ganglia sink
257
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
258
*.sink.ganglia.host=ganglia.example.com
259
*.sink.ganglia.port=8649
260
*.sink.ganglia.period=30
261
*.sink.ganglia.unit=seconds
262
*.sink.ganglia.mode=multicast
263
*.sink.ganglia.ttl=2
264
```
265
266
The sink automatically receives and reports all metrics registered with Spark's MetricRegistry, including:
267
268
- JVM metrics (memory, garbage collection, thread pools)
269
- Spark application metrics (tasks, stages, executors)
270
- Custom application metrics
271
- Driver and executor metrics
272
273
## License and Distribution
274
275
This module is distributed separately from the main Spark distribution due to its dependency on LGPL-licensed Ganglia components (specifically the `gmetric4j` library). While Spark itself is Apache 2.0 licensed, the Ganglia integration must include LGPL dependencies, requiring separate distribution to maintain license compatibility.