or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-ganglia-lgpl_2-10

Ganglia metrics sink integration for Apache Spark enabling metrics reporting to Ganglia monitoring systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-ganglia-lgpl_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl_2-10@1.6.0

0

# Spark Ganglia LGPL

1

2

Spark Ganglia LGPL provides a Ganglia metrics sink integration for Apache Spark, enabling Spark applications to report performance and operational metrics to Ganglia monitoring systems. This module uses the Ganglia GMetric protocol to send metrics data via UDP with support for both multicast and unicast addressing modes.

3

4

## Package Information

5

6

- **Package Name**: spark-ganglia-lgpl_2.10

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-ganglia-lgpl_2.10

11

- **Installation**: Add Maven dependency in `pom.xml`

12

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-ganglia-lgpl_2.10</artifactId>

17

<version>1.6.3</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```scala

24

import org.apache.spark.metrics.sink.GangliaSink

25

import java.util.Properties

26

import java.util.concurrent.TimeUnit

27

import com.codahale.metrics.MetricRegistry

28

import org.apache.spark.{SecurityManager, SparkConf}

29

```

30

31

## Basic Usage

32

33

```scala

34

import org.apache.spark.metrics.sink.GangliaSink

35

import java.util.Properties

36

import java.util.concurrent.TimeUnit

37

import com.codahale.metrics.MetricRegistry

38

import org.apache.spark.{SecurityManager, SparkConf}

39

40

// Configure properties for Ganglia connection

41

val properties = new Properties()

42

properties.setProperty("host", "ganglia.example.com")

43

properties.setProperty("port", "8649")

44

properties.setProperty("period", "30")

45

properties.setProperty("unit", "SECONDS")

46

properties.setProperty("mode", "MULTICAST")

47

properties.setProperty("ttl", "2")

48

49

// Create metrics registry and security manager

50

val registry = new MetricRegistry()

51

val sparkConf = new SparkConf() // Assuming SparkConf is available

52

val securityMgr = new SecurityManager(sparkConf)

53

54

// Initialize Ganglia sink

55

val gangliaSink = new GangliaSink(properties, registry, securityMgr)

56

57

// Start metrics reporting

58

gangliaSink.start()

59

60

// Report metrics immediately (optional)

61

gangliaSink.report()

62

63

// Stop metrics reporting when done

64

gangliaSink.stop()

65

```

66

67

## Capabilities

68

69

### GangliaSink Class

70

71

Main class that implements the Spark metrics sink interface for reporting metrics to Ganglia.

72

73

```scala { .api }

74

/**

75

* Ganglia metrics sink that reports Spark metrics to Ganglia monitoring systems

76

* @param property Configuration properties for Ganglia connection

77

* @param registry Codahale MetricRegistry containing metrics to report

78

* @param securityMgr Spark SecurityManager instance

79

*/

80

class GangliaSink(

81

val property: Properties,

82

val registry: MetricRegistry,

83

securityMgr: SecurityManager

84

) extends Sink {

85

86

/**

87

* Internal helper method to convert properties to Options

88

* @param prop Property key to retrieve

89

* @return Option containing property value or None if not found

90

*/

91

private def propertyToOption(prop: String): Option[String]

92

}

93

```

94

95

### Start Metrics Reporting

96

97

Starts the Ganglia reporter with the configured polling period and time unit.

98

99

```scala { .api }

100

/**

101

* Start the Ganglia metrics reporter

102

* Begins periodic reporting based on configured period and unit

103

*/

104

override def start(): Unit

105

```

106

107

### Stop Metrics Reporting

108

109

Stops the Ganglia reporter and ceases all metrics reporting.

110

111

```scala { .api }

112

/**

113

* Stop the Ganglia metrics reporter

114

* Immediately stops all metrics reporting to Ganglia

115

*/

116

override def stop(): Unit

117

```

118

119

### Report Metrics Immediately

120

121

Triggers an immediate report of all current metrics to Ganglia, independent of the configured polling schedule.

122

123

```scala { .api }

124

/**

125

* Report all current metrics immediately to Ganglia

126

* Does not affect the regular polling schedule

127

*/

128

override def report(): Unit

129

```

130

131

## Configuration

132

133

### Required Properties

134

135

These properties must be provided in the Properties object passed to the constructor:

136

137

```scala { .api }

138

// Required configuration keys

139

val GANGLIA_KEY_HOST = "host" // Ganglia server hostname

140

val GANGLIA_KEY_PORT = "port" // Ganglia server port number

141

```

142

143

**Example:**

144

```scala

145

properties.setProperty("host", "ganglia.example.com")

146

properties.setProperty("port", "8649")

147

```

148

149

### Optional Properties

150

151

These properties have default values if not specified:

152

153

```scala { .api }

154

// Optional configuration keys with defaults

155

val GANGLIA_KEY_PERIOD = "period" // Reporting period (default: 10)

156

val GANGLIA_KEY_UNIT = "unit" // Time unit (default: SECONDS)

157

val GANGLIA_KEY_MODE = "mode" // UDP mode (default: MULTICAST)

158

val GANGLIA_KEY_TTL = "ttl" // Multicast TTL (default: 1)

159

```

160

161

**Default Values:**

162

```scala { .api }

163

val GANGLIA_DEFAULT_PERIOD = 10

164

val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS

165

val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST

166

val GANGLIA_DEFAULT_TTL = 1

167

```

168

169

**Configuration Examples:**

170

```scala

171

// Reporting every 30 seconds

172

properties.setProperty("period", "30")

173

properties.setProperty("unit", "SECONDS")

174

175

// Using unicast mode instead of multicast

176

properties.setProperty("mode", "UNICAST")

177

178

// Setting TTL for multicast (useful for multi-hop networks)

179

properties.setProperty("ttl", "3")

180

```

181

182

### Valid Configuration Values

183

184

**Time Units:** `NANOSECONDS`, `MICROSECONDS`, `MILLISECONDS`, `SECONDS`, `MINUTES`, `HOURS`, `DAYS`

185

186

**UDP Addressing Modes:** `MULTICAST`, `UNICAST`

187

188

## Types

189

190

### Dependencies and Imports

191

192

```scala { .api }

193

// Java standard library

194

import java.util.Properties

195

import java.util.concurrent.TimeUnit

196

197

// Codahale Metrics (Dropwizard Metrics)

198

import com.codahale.metrics.MetricRegistry

199

import com.codahale.metrics.ganglia.GangliaReporter

200

201

// Ganglia GMetric library

202

import info.ganglia.gmetric4j.gmetric.GMetric

203

import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode

204

205

// Spark framework

206

import org.apache.spark.SecurityManager

207

import org.apache.spark.metrics.MetricsSystem

208

import org.apache.spark.metrics.sink.Sink

209

```

210

211

### Sink Trait

212

213

Base trait that GangliaSink implements, defining the standard metrics sink interface:

214

215

```scala { .api }

216

private[spark] trait Sink {

217

def start(): Unit

218

def stop(): Unit

219

def report(): Unit

220

}

221

```

222

223

## Error Handling

224

225

The GangliaSink constructor validates required configuration in this order and throws exceptions for missing or invalid values:

226

227

1. **Missing host**: Throws `Exception("Ganglia sink requires 'host' property.")`

228

2. **Missing port**: Throws `Exception("Ganglia sink requires 'port' property.")`

229

3. **Invalid port**: Throws `NumberFormatException` if port cannot be parsed as integer

230

4. **Invalid numeric values**: Throws `NumberFormatException` for invalid period or ttl values

231

5. **Invalid enum values**: Throws `IllegalArgumentException` for invalid mode or unit values

232

6. **Invalid polling period**: May throw exception from `MetricsSystem.checkMinimalPollingPeriod()`

233

234

Validation occurs during object construction before the reporter is created.

235

236

**Example error handling:**

237

```scala

238

try {

239

val gangliaSink = new GangliaSink(properties, registry, securityMgr)

240

gangliaSink.start()

241

} catch {

242

case e: Exception =>

243

println(s"Failed to initialize Ganglia sink: ${e.getMessage}")

244

case e: NumberFormatException =>

245

println(s"Invalid numeric configuration: ${e.getMessage}")

246

case e: IllegalArgumentException =>

247

println(s"Invalid configuration value: ${e.getMessage}")

248

}

249

```

250

251

## Integration with Spark Metrics

252

253

This sink integrates with Spark's metrics system through configuration. Add to your `metrics.properties` file:

254

255

```properties

256

# Enable Ganglia sink

257

*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink

258

*.sink.ganglia.host=ganglia.example.com

259

*.sink.ganglia.port=8649

260

*.sink.ganglia.period=30

261

*.sink.ganglia.unit=seconds

262

*.sink.ganglia.mode=multicast

263

*.sink.ganglia.ttl=2

264

```

265

266

The sink automatically receives and reports all metrics registered with Spark's MetricRegistry, including:

267

268

- JVM metrics (memory, garbage collection, thread pools)

269

- Spark application metrics (tasks, stages, executors)

270

- Custom application metrics

271

- Driver and executor metrics

272

273

## License and Distribution

274

275

This module is distributed separately from the main Spark distribution due to its dependency on LGPL-licensed Ganglia components (specifically the `gmetric4j` library). While Spark itself is Apache 2.0 licensed, the Ganglia integration must include LGPL dependencies, requiring separate distribution to maintain license compatibility.