or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-ganglia-lgpl-2-12

Ganglia metrics reporting integration for Apache Spark monitoring systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-ganglia-lgpl_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-ganglia-lgpl-2-12@3.5.0

0

# Spark Ganglia LGPL Integration

1

2

Spark Ganglia LGPL provides Ganglia monitoring integration for Apache Spark, enabling the collection and reporting of Spark metrics to Ganglia monitoring systems. This package implements a GangliaSink that extends Spark's metrics system to publish performance metrics, job statistics, and runtime information to Ganglia infrastructure for centralized monitoring and alerting.

3

4

## Package Information

5

6

- **Package Name**: spark-ganglia-lgpl_2.12

7

- **Package Type**: Maven

8

- **Language**: Scala with Java components

9

- **Group ID**: org.apache.spark

10

- **Installation**: Add to your Maven/SBT dependencies with appropriate Ganglia dependencies

11

12

Maven:

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-ganglia-lgpl_2.12</artifactId>

17

<version>3.5.6</version>

18

</dependency>

19

<dependency>

20

<groupId>info.ganglia.gmetric4j</groupId>

21

<artifactId>gmetric4j</artifactId>

22

<version>1.0.10</version>

23

</dependency>

24

```

25

26

SBT:

27

```scala

28

libraryDependencies += "org.apache.spark" %% "spark-ganglia-lgpl" % "3.5.6"

29

libraryDependencies += "info.ganglia.gmetric4j" % "gmetric4j" % "1.0.10"

30

```

31

32

## Core Imports

33

34

```scala

35

import org.apache.spark.metrics.sink.GangliaSink

36

import java.util.Properties

37

import com.codahale.metrics.MetricRegistry

38

```

39

40

For direct GangliaReporter usage:

41

```java

42

import com.codahale.metrics.ganglia.GangliaReporter;

43

import info.ganglia.gmetric4j.gmetric.GMetric;

44

```

45

46

## Basic Usage

47

48

```scala

49

import org.apache.spark.SparkConf

50

import org.apache.spark.SparkContext

51

import java.util.Properties

52

53

// Configure Spark to use Ganglia sink via configuration

54

val conf = new SparkConf()

55

.setAppName("MySparkApp")

56

.set("spark.metrics.conf.driver.sink.ganglia.class", "org.apache.spark.metrics.sink.GangliaSink")

57

.set("spark.metrics.conf.driver.sink.ganglia.host", "ganglia-server.example.com")

58

.set("spark.metrics.conf.driver.sink.ganglia.port", "8649")

59

.set("spark.metrics.conf.driver.sink.ganglia.period", "10")

60

.set("spark.metrics.conf.driver.sink.ganglia.unit", "seconds")

61

62

val sc = new SparkContext(conf)

63

// Metrics will now be automatically reported to Ganglia

64

```

65

66

Configuration file approach (`metrics.properties`):

67

```properties

68

# Ganglia sink configuration

69

driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink

70

driver.sink.ganglia.host=ganglia-server.example.com

71

driver.sink.ganglia.port=8649

72

driver.sink.ganglia.period=10

73

driver.sink.ganglia.unit=seconds

74

driver.sink.ganglia.mode=multicast

75

driver.sink.ganglia.ttl=1

76

```

77

78

## Architecture

79

80

The Spark Ganglia LGPL integration consists of two main components:

81

82

- **GangliaSink**: Spark-specific metrics sink that integrates with Spark's metrics system

83

- **GangliaReporter**: Dropwizard Metrics reporter that handles the actual communication with Ganglia

84

- **Configuration System**: Property-based configuration supporting various network modes and timing options

85

- **Type Mapping**: Automatic conversion between Java/Scala types and Ganglia metric types

86

87

The sink automatically discovers and reports all Spark metrics including JVM metrics, application metrics, and custom user metrics registered with the metrics system.

88

89

## Capabilities

90

91

### Ganglia Sink Integration

92

93

Core Spark metrics sink implementation that connects Spark's metrics system to Ganglia monitoring infrastructure.

94

95

```scala { .api }

96

class GangliaSink(

97

val property: Properties,

98

val registry: MetricRegistry

99

) extends Sink {

100

def start(): Unit

101

def stop(): Unit

102

def report(): Unit

103

def propertyToOption(prop: String): Option[String]

104

}

105

106

trait Sink {

107

def start(): Unit

108

def stop(): Unit

109

def report(): Unit

110

}

111

```

112

113

**Configuration Constants:**

114

115

```scala { .api }

116

// Period configuration

117

val GANGLIA_KEY_PERIOD: String = "period"

118

val GANGLIA_DEFAULT_PERIOD: Int = 10

119

120

// Time unit configuration

121

val GANGLIA_KEY_UNIT: String = "unit"

122

val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS

123

124

// Network mode configuration

125

val GANGLIA_KEY_MODE: String = "mode"

126

val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST

127

128

// TTL configuration for multicast

129

val GANGLIA_KEY_TTL: String = "ttl"

130

val GANGLIA_DEFAULT_TTL: Int = 1

131

132

// Required host and port

133

val GANGLIA_KEY_HOST: String = "host"

134

val GANGLIA_KEY_PORT: String = "port"

135

136

// Data max configuration

137

val GANGLIA_KEY_DMAX: String = "dmax"

138

val GANGLIA_DEFAULT_DMAX: Int = 0

139

```

140

141

**Usage Example:**

142

143

```scala

144

import org.apache.spark.metrics.sink.GangliaSink

145

import com.codahale.metrics.MetricRegistry

146

import java.util.Properties

147

148

// Manual instantiation (typically done by Spark's metrics system)

149

val properties = new Properties()

150

properties.setProperty("host", "ganglia-server.example.com")

151

properties.setProperty("port", "8649")

152

properties.setProperty("period", "10")

153

properties.setProperty("unit", "seconds")

154

properties.setProperty("mode", "multicast")

155

properties.setProperty("ttl", "1")

156

157

val registry = new MetricRegistry()

158

val sink = new GangliaSink(properties, registry)

159

160

// Start reporting

161

sink.start()

162

163

// Manual report (usually automatic via polling)

164

sink.report()

165

166

// Stop reporting

167

sink.stop()

168

```

169

170

### Ganglia Reporter

171

172

Low-level Dropwizard Metrics reporter for direct Ganglia integration, supporting comprehensive metric types and flexible configuration.

173

174

```java { .api }

175

public class GangliaReporter extends ScheduledReporter {

176

public static Builder forRegistry(MetricRegistry registry);

177

178

// Inherited from ScheduledReporter

179

public void start(long period, TimeUnit unit);

180

public void stop();

181

public void report();

182

183

// Core reporting method

184

public void report(

185

SortedMap<String, Gauge> gauges,

186

SortedMap<String, Counter> counters,

187

SortedMap<String, Histogram> histograms,

188

SortedMap<String, Meter> meters,

189

SortedMap<String, Timer> timers

190

);

191

}

192

193

public static class GangliaReporter.Builder {

194

public Builder prefixedWith(String prefix);

195

public Builder withTMax(int tMax);

196

public Builder withDMax(int dMax);

197

public Builder convertRatesTo(TimeUnit rateUnit);

198

public Builder convertDurationsTo(TimeUnit durationUnit);

199

public Builder filter(MetricFilter filter);

200

public Builder scheduleOn(ScheduledExecutorService executor);

201

public Builder shutdownExecutorOnStop(boolean shutdownExecutorOnStop);

202

public Builder disabledMetricAttributes(Set<MetricAttribute> disabledMetricAttributes);

203

public GangliaReporter build(GMetric gmetric);

204

public GangliaReporter build(GMetric... gmetrics);

205

}

206

```

207

208

**Usage Example:**

209

210

```java

211

import com.codahale.metrics.MetricRegistry;

212

import com.codahale.metrics.ganglia.GangliaReporter;

213

import info.ganglia.gmetric4j.gmetric.GMetric;

214

import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode;

215

import java.util.concurrent.TimeUnit;

216

217

// Create Ganglia client

218

GMetric ganglia = new GMetric(

219

"ganglia-server.example.com", // host

220

8649, // port

221

UDPAddressingMode.MULTICAST, // mode

222

1 // ttl

223

);

224

225

// Build reporter with configuration

226

MetricRegistry registry = new MetricRegistry();

227

GangliaReporter reporter = GangliaReporter.forRegistry(registry)

228

.prefixedWith("spark.app")

229

.convertDurationsTo(TimeUnit.MILLISECONDS)

230

.convertRatesTo(TimeUnit.SECONDS)

231

.withDMax(0)

232

.build(ganglia);

233

234

// Start scheduled reporting every 10 seconds

235

reporter.start(10, TimeUnit.SECONDS);

236

237

// Manual report

238

reporter.report();

239

240

// Stop reporting

241

reporter.stop();

242

```

243

244

## Types

245

246

### Network Configuration Types

247

248

```java { .api }

249

// From info.ganglia.gmetric4j.gmetric package

250

enum UDPAddressingMode {

251

MULTICAST,

252

UNICAST

253

}

254

255

class GMetric {

256

public GMetric(String host, int port, UDPAddressingMode mode, int ttl);

257

public void announce(String name, String value, GMetricType type,

258

String units, GMetricSlope slope, int tMax, int dMax, String group);

259

}

260

261

enum GMetricType {

262

STRING, INT8, UINT8, INT16, UINT16, INT32, UINT32, FLOAT, DOUBLE

263

}

264

265

enum GMetricSlope {

266

ZERO, POSITIVE, NEGATIVE, BOTH, UNSPECIFIED

267

}

268

```

269

270

### Metrics Types

271

272

```java { .api }

273

// From com.codahale.metrics package

274

class MetricRegistry {

275

// Standard Dropwizard Metrics registry

276

}

277

278

interface Gauge<T> {

279

T getValue();

280

}

281

282

class Counter {

283

long getCount();

284

void inc();

285

void inc(long n);

286

void dec();

287

void dec(long n);

288

}

289

290

class Histogram {

291

void update(int value);

292

void update(long value);

293

long getCount();

294

Snapshot getSnapshot();

295

}

296

297

class Meter {

298

void mark();

299

void mark(long n);

300

long getCount();

301

double getFifteenMinuteRate();

302

double getFiveMinuteRate();

303

double getMeanRate();

304

double getOneMinuteRate();

305

}

306

307

class Timer {

308

void update(long duration, TimeUnit unit);

309

Time time();

310

long getCount();

311

double getFifteenMinuteRate();

312

double getFiveMinuteRate();

313

double getMeanRate();

314

double getOneMinuteRate();

315

Snapshot getSnapshot();

316

}

317

```

318

319

### Configuration Types

320

321

```scala { .api }

322

// Standard Java types used in configuration

323

import java.util.Properties

324

import java.util.concurrent.TimeUnit

325

import java.util.concurrent.ScheduledExecutorService

326

import java.util.Set

327

import com.codahale.metrics.MetricFilter

328

import com.codahale.metrics.MetricAttribute

329

```

330

331

## Error Handling

332

333

The package includes comprehensive error handling:

334

335

**Configuration Validation:**

336

- Throws `Exception` if required `host` property is not provided

337

- Throws `Exception` if required `port` property is not provided

338

- Validates polling period using `MetricsSystem.checkMinimalPollingPeriod`

339

340

**Runtime Error Handling:**

341

- `GangliaException` caught and logged as warnings during metric reporting

342

- Graceful degradation when Ganglia server is unavailable

343

- Automatic type detection with fallback to STRING type for unknown objects

344

345

**Common Configuration Errors:**

346

347

```scala

348

// Missing required properties

349

val props = new Properties()

350

// Missing host/port will throw Exception during GangliaSink construction

351

352

// Invalid property values

353

props.setProperty("ttl", "invalid") // Will cause NumberFormatException

354

props.setProperty("mode", "INVALID") // Will cause IllegalArgumentException

355

```

356

357

## Advanced Configuration

358

359

### Multiple Ganglia Servers

360

361

```java

362

// Configure multiple Ganglia servers for high availability

363

GMetric ganglia1 = new GMetric("ganglia1.example.com", 8649, UDPAddressingMode.MULTICAST, 1);

364

GMetric ganglia2 = new GMetric("ganglia2.example.com", 8649, UDPAddressingMode.MULTICAST, 1);

365

366

GangliaReporter reporter = GangliaReporter.forRegistry(registry)

367

.build(ganglia1, ganglia2);

368

```

369

370

### Custom Metric Filtering

371

372

```java

373

// Filter out specific metrics

374

GangliaReporter reporter = GangliaReporter.forRegistry(registry)

375

.filter(MetricFilter.contains("jvm")) // Only JVM metrics

376

.disabledMetricAttributes(Set.of(MetricAttribute.P999, MetricAttribute.P99))

377

.build(ganglia);

378

```

379

380

### Network Mode Configuration

381

382

```properties

383

# Multicast mode (default)

384

driver.sink.ganglia.mode=multicast

385

driver.sink.ganglia.ttl=1

386

387

# Unicast mode for complex network topologies

388

driver.sink.ganglia.mode=unicast

389

```

390

391

## Integration with Spark Metrics System

392

393

The GangliaSink automatically integrates with Spark's comprehensive metrics system, reporting:

394

395

- **Application Metrics**: Job execution times, task counts, stage durations

396

- **JVM Metrics**: Heap usage, GC times, thread counts

397

- **System Metrics**: CPU usage, disk I/O, network I/O

398

- **Custom Metrics**: User-registered metrics via `SparkContext.metricRegistry`

399

400

All metrics are automatically formatted with appropriate Ganglia metadata including metric names, types, units, and groupings.