or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

accumulators.mddocs/

0

# Accumulators

1

2

Shared variables for collecting information from distributed tasks, supporting aggregation patterns like counters, sums, and custom aggregations across cluster nodes.

3

4

## Capabilities

5

6

### AccumulatorV2 Base Class

7

8

Abstract base class for all accumulators providing type-safe aggregation of values from distributed tasks.

9

10

```scala { .api }

11

/**

12

* Base class for accumulators that aggregate values across tasks

13

* @tparam IN input type (what gets added)

14

* @tparam OUT output type (what gets returned)

15

*/

16

abstract class AccumulatorV2[IN, OUT] extends Serializable {

17

/** Whether this accumulator is zero value */

18

def isZero: Boolean

19

20

/** Copy this accumulator */

21

def copy(): AccumulatorV2[IN, OUT]

22

23

/** Reset this accumulator to zero value */

24

def reset(): Unit

25

26

/** Add value to this accumulator */

27

def add(v: IN): Unit

28

29

/** Merge another accumulator into this one */

30

def merge(other: AccumulatorV2[IN, OUT]): Unit

31

32

/** Get current accumulated value */

33

def value: OUT

34

35

/** Optional name for this accumulator */

36

def name: Option[String]

37

38

/** Unique ID for this accumulator */

39

def id: Long

40

41

/** Count of how many times add has been called */

42

def count: Long

43

44

/** Average of values added */

45

def avg: Double

46

47

/** Sum of values (for numeric accumulators) */

48

def sum: OUT

49

}

50

```

51

52

### Built-in Accumulator Types

53

54

Pre-defined accumulator implementations for common aggregation patterns.

55

56

```scala { .api }

57

/**

58

* Accumulator for long values

59

*/

60

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {

61

override def isZero: Boolean

62

override def copy(): LongAccumulator

63

override def reset(): Unit

64

override def add(v: java.lang.Long): Unit

65

override def add(v: Long): Unit // Scala convenience method

66

override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit

67

override def value: java.lang.Long

68

override def count: Long

69

override def sum: java.lang.Long

70

override def avg: Double

71

}

72

73

/**

74

* Accumulator for double values

75

*/

76

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {

77

override def isZero: Boolean

78

override def copy(): DoubleAccumulator

79

override def reset(): Unit

80

override def add(v: java.lang.Double): Unit

81

override def add(v: Double): Unit // Scala convenience method

82

override def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit

83

override def value: java.lang.Double

84

override def count: Long

85

override def sum: java.lang.Double

86

override def avg: Double

87

}

88

89

/**

90

* Accumulator for collecting objects into a list

91

*/

92

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

93

override def isZero: Boolean

94

override def copy(): CollectionAccumulator[T]

95

override def reset(): Unit

96

override def add(v: T): Unit

97

override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit

98

override def value: java.util.List[T]

99

}

100

```

101

102

### Creating Accumulators

103

104

Accumulators are created through SparkContext methods with optional names for monitoring.

105

106

```scala { .api }

107

class SparkContext(config: SparkConf) {

108

/** Create long accumulator */

109

def longAccumulator(): LongAccumulator

110

def longAccumulator(name: String): LongAccumulator

111

112

/** Create double accumulator */

113

def doubleAccumulator(): DoubleAccumulator

114

def doubleAccumulator(name: String): DoubleAccumulator

115

116

/** Create collection accumulator */

117

def collectionAccumulator[T](): CollectionAccumulator[T]

118

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

119

120

/** Register custom accumulator */

121

def register[T](accumulator: AccumulatorV2[T, T]): Unit

122

def register[T](accumulator: AccumulatorV2[T, T], name: String): Unit

123

}

124

125

// Java API

126

public class JavaSparkContext {

127

/** Create long accumulator */

128

public LongAccumulator longAccumulator()

129

public LongAccumulator longAccumulator(String name)

130

131

/** Create double accumulator */

132

public DoubleAccumulator doubleAccumulator()

133

public DoubleAccumulator doubleAccumulator(String name)

134

135

/** Create collection accumulator */

136

public <T> CollectionAccumulator<T> collectionAccumulator()

137

public <T> CollectionAccumulator<T> collectionAccumulator(String name)

138

}

139

```

140

141

### Custom Accumulators

142

143

Create custom accumulators for domain-specific aggregation patterns.

144

145

```scala { .api }

146

/**

147

* Example: Accumulator for collecting statistics

148

*/

149

class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {

150

private var _count: Long = 0L

151

private var _sum: Double = 0.0

152

private var _min: Double = Double.MaxValue

153

private var _max: Double = Double.MinValue

154

private var _sumSquares: Double = 0.0

155

156

override def isZero: Boolean = _count == 0

157

158

override def copy(): StatsAccumulator = {

159

val newAcc = new StatsAccumulator

160

newAcc._count = this._count

161

newAcc._sum = this._sum

162

newAcc._min = this._min

163

newAcc._max = this._max

164

newAcc._sumSquares = this._sumSquares

165

newAcc

166

}

167

168

override def reset(): Unit = {

169

_count = 0L

170

_sum = 0.0

171

_min = Double.MaxValue

172

_max = Double.MinValue

173

_sumSquares = 0.0

174

}

175

176

override def add(v: Double): Unit = {

177

_count += 1

178

_sum += v

179

_min = math.min(_min, v)

180

_max = math.max(_max, v)

181

_sumSquares += v * v

182

}

183

184

override def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {

185

other match {

186

case o: StatsAccumulator =>

187

if (o._count > 0) {

188

_count += o._count

189

_sum += o._sum

190

_min = math.min(_min, o._min)

191

_max = math.max(_max, o._max)

192

_sumSquares += o._sumSquares

193

}

194

}

195

}

196

197

override def value: (Long, Double, Double, Double, Double) = {

198

if (_count == 0) {

199

(0L, 0.0, 0.0, 0.0, 0.0)

200

} else {

201

val mean = _sum / _count

202

val variance = (_sumSquares / _count) - (mean * mean)

203

(_count, _sum, _min, _max, math.sqrt(variance))

204

}

205

}

206

}

207

208

/**

209

* Example: Set accumulator for collecting unique values

210

*/

211

class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {

212

private val _set = mutable.Set.empty[T]

213

214

override def isZero: Boolean = _set.isEmpty

215

216

override def copy(): SetAccumulator[T] = {

217

val newAcc = new SetAccumulator[T]

218

newAcc._set ++= this._set

219

newAcc

220

}

221

222

override def reset(): Unit = _set.clear()

223

224

override def add(v: T): Unit = _set += v

225

226

override def merge(other: AccumulatorV2[T, Set[T]]): Unit = {

227

other match {

228

case o: SetAccumulator[T] => _set ++= o._set

229

}

230

}

231

232

override def value: Set[T] = _set.toSet

233

}

234

```

235

236

**Usage Examples:**

237

238

```scala

239

import org.apache.spark.{SparkContext, SparkConf}

240

241

val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))

242

243

// Create built-in accumulators

244

val errorCount = sc.longAccumulator("Error Count")

245

val processingTime = sc.doubleAccumulator("Total Processing Time")

246

val errorMessages = sc.collectionAccumulator[String]("Error Messages")

247

248

// Create RDD

249

val data = sc.parallelize(1 to 1000)

250

251

// Use accumulators in transformations

252

val processed = data.map { x =>

253

val startTime = System.currentTimeMillis()

254

255

try {

256

if (x % 100 == 0) {

257

throw new RuntimeException(s"Simulated error for $x")

258

}

259

260

// Simulate processing

261

Thread.sleep(1)

262

val result = x * 2

263

264

val endTime = System.currentTimeMillis()

265

processingTime.add(endTime - startTime)

266

267

result

268

} catch {

269

case e: Exception =>

270

errorCount.add(1)

271

errorMessages.add(s"Error processing $x: ${e.getMessage}")

272

0 // Default value

273

}

274

}

275

276

// Trigger computation

277

val results = processed.collect()

278

279

// Access accumulator values (only on driver)

280

println(s"Total errors: ${errorCount.value}")

281

println(s"Average processing time: ${processingTime.value / (1000 - errorCount.value)} ms")

282

println(s"Error messages: ${errorMessages.value.asScala.mkString(", ")}")

283

284

// Custom accumulator example

285

val statsAcc = new StatsAccumulator()

286

sc.register(statsAcc, "Data Statistics")

287

288

val numbers = sc.parallelize(Array(1.0, 2.5, 3.7, 4.1, 5.9, 2.3, 7.8, 1.2))

289

numbers.foreach(statsAcc.add)

290

291

val (count, sum, min, max, stddev) = statsAcc.value

292

println(s"Count: $count, Sum: $sum, Min: $min, Max: $max, StdDev: $stddev")

293

294

// Set accumulator for unique values

295

val uniqueValues = new SetAccumulator[Int]()

296

sc.register(uniqueValues, "Unique Values")

297

298

val duplicatedData = sc.parallelize(Array(1, 2, 3, 2, 1, 4, 3, 5))

299

duplicatedData.foreach(uniqueValues.add)

300

301

println(s"Unique values: ${uniqueValues.value}")

302

303

sc.stop()

304

```

305

306

**Java Examples:**

307

308

```java

309

import org.apache.spark.SparkConf;

310

import org.apache.spark.api.java.JavaSparkContext;

311

import org.apache.spark.api.java.JavaRDD;

312

import org.apache.spark.util.LongAccumulator;

313

import org.apache.spark.util.DoubleAccumulator;

314

import org.apache.spark.util.CollectionAccumulator;

315

316

import java.util.Arrays;

317

import java.util.List;

318

319

JavaSparkContext sc = new JavaSparkContext(

320

new SparkConf().setAppName("Java Accumulator Example")

321

);

322

323

// Create accumulators

324

LongAccumulator counter = sc.longAccumulator("Counter");

325

DoubleAccumulator sum = sc.doubleAccumulator("Sum");

326

CollectionAccumulator<String> logs = sc.collectionAccumulator("Logs");

327

328

// Create RDD

329

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

330

JavaRDD<Integer> rdd = sc.parallelize(data);

331

332

// Use accumulators

333

JavaRDD<Integer> processed = rdd.map(x -> {

334

counter.add(1);

335

sum.add(x.doubleValue());

336

logs.add("Processed: " + x);

337

return x * 2;

338

});

339

340

// Trigger action

341

List<Integer> results = processed.collect();

342

343

// Access values

344

System.out.println("Count: " + counter.value());

345

System.out.println("Sum: " + sum.value());

346

System.out.println("Logs: " + logs.value());

347

348

sc.close();

349

```

350

351

## Best Practices

352

353

### When to Use Accumulators

354

355

- **Debugging**: Count errors, null values, or invalid records

356

- **Monitoring**: Track processing metrics and performance

357

- **Statistics**: Collect summary statistics during processing

358

- **Logging**: Gather diagnostic information from tasks

359

360

### Important Considerations

361

362

```scala

363

// Good: Use accumulators in actions (guaranteed exactly-once)

364

val errorCount = sc.longAccumulator("Errors")

365

rdd.foreach { x =>

366

if (isInvalid(x)) errorCount.add(1)

367

}

368

369

// Caution: Accumulators in transformations may be called multiple times

370

val warningCount = sc.longAccumulator("Warnings")

371

val filtered = rdd.filter { x =>

372

if (isWarning(x)) {

373

warningCount.add(1) // May be incremented multiple times if RDD is recomputed

374

}

375

isValid(x)

376

}

377

378

// Good: Named accumulators for monitoring

379

val processedRecords = sc.longAccumulator("Processed Records")

380

val skippedRecords = sc.longAccumulator("Skipped Records")

381

382

// Good: Reset accumulators when reusing

383

errorCount.reset()

384

```

385

386

### Performance Tips

387

388

- Accumulators have minimal performance overhead

389

- Avoid accumulating large collections; consider sampling

390

- Use appropriate accumulator types for your data

391

- Name accumulators for easier monitoring in Spark UI

392

393

### Error Handling

394

395

```scala

396

val errorAcc = sc.collectionAccumulator[String]("Errors")

397

398

val processed = rdd.map { record =>

399

try {

400

processRecord(record)

401

} catch {

402

case e: Exception =>

403

errorAcc.add(s"Error processing $record: ${e.getMessage}")

404

null // or default value

405

}

406

}.filter(_ != null)

407

```

408

409

Accumulators provide a powerful mechanism for aggregating information from distributed computations while maintaining fault tolerance and exactly-once semantics for actions.