0
# Accumulators
1
2
Shared variables for collecting information from distributed tasks, supporting aggregation patterns like counters, sums, and custom aggregations across cluster nodes.
3
4
## Capabilities
5
6
### AccumulatorV2 Base Class
7
8
Abstract base class for all accumulators providing type-safe aggregation of values from distributed tasks.
9
10
```scala { .api }
11
/**
12
* Base class for accumulators that aggregate values across tasks
13
* @tparam IN input type (what gets added)
14
* @tparam OUT output type (what gets returned)
15
*/
16
abstract class AccumulatorV2[IN, OUT] extends Serializable {
17
/** Whether this accumulator is zero value */
18
def isZero: Boolean
19
20
/** Copy this accumulator */
21
def copy(): AccumulatorV2[IN, OUT]
22
23
/** Reset this accumulator to zero value */
24
def reset(): Unit
25
26
/** Add value to this accumulator */
27
def add(v: IN): Unit
28
29
/** Merge another accumulator into this one */
30
def merge(other: AccumulatorV2[IN, OUT]): Unit
31
32
/** Get current accumulated value */
33
def value: OUT
34
35
/** Optional name for this accumulator */
36
def name: Option[String]
37
38
/** Unique ID for this accumulator */
39
def id: Long
40
41
/** Count of how many times add has been called */
42
def count: Long
43
44
/** Average of values added */
45
def avg: Double
46
47
/** Sum of values (for numeric accumulators) */
48
def sum: OUT
49
}
50
```
51
52
### Built-in Accumulator Types
53
54
Pre-defined accumulator implementations for common aggregation patterns.
55
56
```scala { .api }
57
/**
58
* Accumulator for long values
59
*/
60
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
61
override def isZero: Boolean
62
override def copy(): LongAccumulator
63
override def reset(): Unit
64
override def add(v: java.lang.Long): Unit
65
override def add(v: Long): Unit // Scala convenience method
66
override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit
67
override def value: java.lang.Long
68
override def count: Long
69
override def sum: java.lang.Long
70
override def avg: Double
71
}
72
73
/**
74
* Accumulator for double values
75
*/
76
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
77
override def isZero: Boolean
78
override def copy(): DoubleAccumulator
79
override def reset(): Unit
80
override def add(v: java.lang.Double): Unit
81
override def add(v: Double): Unit // Scala convenience method
82
override def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit
83
override def value: java.lang.Double
84
override def count: Long
85
override def sum: java.lang.Double
86
override def avg: Double
87
}
88
89
/**
90
* Accumulator for collecting objects into a list
91
*/
92
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
93
override def isZero: Boolean
94
override def copy(): CollectionAccumulator[T]
95
override def reset(): Unit
96
override def add(v: T): Unit
97
override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit
98
override def value: java.util.List[T]
99
}
100
```
101
102
### Creating Accumulators
103
104
Accumulators are created through SparkContext methods with optional names for monitoring.
105
106
```scala { .api }
107
class SparkContext(config: SparkConf) {
108
/** Create long accumulator */
109
def longAccumulator(): LongAccumulator
110
def longAccumulator(name: String): LongAccumulator
111
112
/** Create double accumulator */
113
def doubleAccumulator(): DoubleAccumulator
114
def doubleAccumulator(name: String): DoubleAccumulator
115
116
/** Create collection accumulator */
117
def collectionAccumulator[T](): CollectionAccumulator[T]
118
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
119
120
/** Register custom accumulator */
121
def register[T](accumulator: AccumulatorV2[T, T]): Unit
122
def register[T](accumulator: AccumulatorV2[T, T], name: String): Unit
123
}
124
125
// Java API
126
public class JavaSparkContext {
127
/** Create long accumulator */
128
public LongAccumulator longAccumulator()
129
public LongAccumulator longAccumulator(String name)
130
131
/** Create double accumulator */
132
public DoubleAccumulator doubleAccumulator()
133
public DoubleAccumulator doubleAccumulator(String name)
134
135
/** Create collection accumulator */
136
public <T> CollectionAccumulator<T> collectionAccumulator()
137
public <T> CollectionAccumulator<T> collectionAccumulator(String name)
138
}
139
```
140
141
### Custom Accumulators
142
143
Create custom accumulators for domain-specific aggregation patterns.
144
145
```scala { .api }
146
/**
147
* Example: Accumulator for collecting statistics
148
*/
149
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
150
private var _count: Long = 0L
151
private var _sum: Double = 0.0
152
private var _min: Double = Double.MaxValue
153
private var _max: Double = Double.MinValue
154
private var _sumSquares: Double = 0.0
155
156
override def isZero: Boolean = _count == 0
157
158
override def copy(): StatsAccumulator = {
159
val newAcc = new StatsAccumulator
160
newAcc._count = this._count
161
newAcc._sum = this._sum
162
newAcc._min = this._min
163
newAcc._max = this._max
164
newAcc._sumSquares = this._sumSquares
165
newAcc
166
}
167
168
override def reset(): Unit = {
169
_count = 0L
170
_sum = 0.0
171
_min = Double.MaxValue
172
_max = Double.MinValue
173
_sumSquares = 0.0
174
}
175
176
override def add(v: Double): Unit = {
177
_count += 1
178
_sum += v
179
_min = math.min(_min, v)
180
_max = math.max(_max, v)
181
_sumSquares += v * v
182
}
183
184
override def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
185
other match {
186
case o: StatsAccumulator =>
187
if (o._count > 0) {
188
_count += o._count
189
_sum += o._sum
190
_min = math.min(_min, o._min)
191
_max = math.max(_max, o._max)
192
_sumSquares += o._sumSquares
193
}
194
}
195
}
196
197
override def value: (Long, Double, Double, Double, Double) = {
198
if (_count == 0) {
199
(0L, 0.0, 0.0, 0.0, 0.0)
200
} else {
201
val mean = _sum / _count
202
val variance = (_sumSquares / _count) - (mean * mean)
203
(_count, _sum, _min, _max, math.sqrt(variance))
204
}
205
}
206
}
207
208
/**
209
* Example: Set accumulator for collecting unique values
210
*/
211
class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
212
private val _set = mutable.Set.empty[T]
213
214
override def isZero: Boolean = _set.isEmpty
215
216
override def copy(): SetAccumulator[T] = {
217
val newAcc = new SetAccumulator[T]
218
newAcc._set ++= this._set
219
newAcc
220
}
221
222
override def reset(): Unit = _set.clear()
223
224
override def add(v: T): Unit = _set += v
225
226
override def merge(other: AccumulatorV2[T, Set[T]]): Unit = {
227
other match {
228
case o: SetAccumulator[T] => _set ++= o._set
229
}
230
}
231
232
override def value: Set[T] = _set.toSet
233
}
234
```
235
236
**Usage Examples:**
237
238
```scala
239
import org.apache.spark.{SparkContext, SparkConf}
240
241
val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))
242
243
// Create built-in accumulators
244
val errorCount = sc.longAccumulator("Error Count")
245
val processingTime = sc.doubleAccumulator("Total Processing Time")
246
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
247
248
// Create RDD
249
val data = sc.parallelize(1 to 1000)
250
251
// Use accumulators in transformations
252
val processed = data.map { x =>
253
val startTime = System.currentTimeMillis()
254
255
try {
256
if (x % 100 == 0) {
257
throw new RuntimeException(s"Simulated error for $x")
258
}
259
260
// Simulate processing
261
Thread.sleep(1)
262
val result = x * 2
263
264
val endTime = System.currentTimeMillis()
265
processingTime.add(endTime - startTime)
266
267
result
268
} catch {
269
case e: Exception =>
270
errorCount.add(1)
271
errorMessages.add(s"Error processing $x: ${e.getMessage}")
272
0 // Default value
273
}
274
}
275
276
// Trigger computation
277
val results = processed.collect()
278
279
// Access accumulator values (only on driver)
280
println(s"Total errors: ${errorCount.value}")
281
println(s"Average processing time: ${processingTime.value / (1000 - errorCount.value)} ms")
282
println(s"Error messages: ${errorMessages.value.asScala.mkString(", ")}")
283
284
// Custom accumulator example
285
val statsAcc = new StatsAccumulator()
286
sc.register(statsAcc, "Data Statistics")
287
288
val numbers = sc.parallelize(Array(1.0, 2.5, 3.7, 4.1, 5.9, 2.3, 7.8, 1.2))
289
numbers.foreach(statsAcc.add)
290
291
val (count, sum, min, max, stddev) = statsAcc.value
292
println(s"Count: $count, Sum: $sum, Min: $min, Max: $max, StdDev: $stddev")
293
294
// Set accumulator for unique values
295
val uniqueValues = new SetAccumulator[Int]()
296
sc.register(uniqueValues, "Unique Values")
297
298
val duplicatedData = sc.parallelize(Array(1, 2, 3, 2, 1, 4, 3, 5))
299
duplicatedData.foreach(uniqueValues.add)
300
301
println(s"Unique values: ${uniqueValues.value}")
302
303
sc.stop()
304
```
305
306
**Java Examples:**
307
308
```java
309
import org.apache.spark.SparkConf;
310
import org.apache.spark.api.java.JavaSparkContext;
311
import org.apache.spark.api.java.JavaRDD;
312
import org.apache.spark.util.LongAccumulator;
313
import org.apache.spark.util.DoubleAccumulator;
314
import org.apache.spark.util.CollectionAccumulator;
315
316
import java.util.Arrays;
317
import java.util.List;
318
319
JavaSparkContext sc = new JavaSparkContext(
320
new SparkConf().setAppName("Java Accumulator Example")
321
);
322
323
// Create accumulators
324
LongAccumulator counter = sc.longAccumulator("Counter");
325
DoubleAccumulator sum = sc.doubleAccumulator("Sum");
326
CollectionAccumulator<String> logs = sc.collectionAccumulator("Logs");
327
328
// Create RDD
329
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
330
JavaRDD<Integer> rdd = sc.parallelize(data);
331
332
// Use accumulators
333
JavaRDD<Integer> processed = rdd.map(x -> {
334
counter.add(1);
335
sum.add(x.doubleValue());
336
logs.add("Processed: " + x);
337
return x * 2;
338
});
339
340
// Trigger action
341
List<Integer> results = processed.collect();
342
343
// Access values
344
System.out.println("Count: " + counter.value());
345
System.out.println("Sum: " + sum.value());
346
System.out.println("Logs: " + logs.value());
347
348
sc.close();
349
```
350
351
## Best Practices
352
353
### When to Use Accumulators
354
355
- **Debugging**: Count errors, null values, or invalid records
356
- **Monitoring**: Track processing metrics and performance
357
- **Statistics**: Collect summary statistics during processing
358
- **Logging**: Gather diagnostic information from tasks
359
360
### Important Considerations
361
362
```scala
363
// Good: Use accumulators in actions (guaranteed exactly-once)
364
val errorCount = sc.longAccumulator("Errors")
365
rdd.foreach { x =>
366
if (isInvalid(x)) errorCount.add(1)
367
}
368
369
// Caution: Accumulators in transformations may be called multiple times
370
val warningCount = sc.longAccumulator("Warnings")
371
val filtered = rdd.filter { x =>
372
if (isWarning(x)) {
373
warningCount.add(1) // May be incremented multiple times if RDD is recomputed
374
}
375
isValid(x)
376
}
377
378
// Good: Named accumulators for monitoring
379
val processedRecords = sc.longAccumulator("Processed Records")
380
val skippedRecords = sc.longAccumulator("Skipped Records")
381
382
// Good: Reset accumulators when reusing
383
errorCount.reset()
384
```
385
386
### Performance Tips
387
388
- Accumulators have minimal performance overhead
389
- Avoid accumulating large collections; consider sampling
390
- Use appropriate accumulator types for your data
391
- Name accumulators for easier monitoring in Spark UI
392
393
### Error Handling
394
395
```scala
396
val errorAcc = sc.collectionAccumulator[String]("Errors")
397
398
val processed = rdd.map { record =>
399
try {
400
processRecord(record)
401
} catch {
402
case e: Exception =>
403
errorAcc.add(s"Error processing $record: ${e.getMessage}")
404
null // or default value
405
}
406
}.filter(_ != null)
407
```
408
409
Accumulators provide a powerful mechanism for aggregating information from distributed computations while maintaining fault tolerance and exactly-once semantics for actions.