Shared variables for collecting information from distributed tasks, supporting aggregation patterns like counters, sums, and custom aggregations across cluster nodes.
Abstract base class for all accumulators providing type-safe aggregation of values from distributed tasks.
/**
* Base class for accumulators that aggregate values across tasks
* @tparam IN input type (what gets added)
* @tparam OUT output type (what gets returned)
*/
abstract class AccumulatorV2[IN, OUT] extends Serializable {
/** Whether this accumulator is zero value */
def isZero: Boolean
/** Copy this accumulator */
def copy(): AccumulatorV2[IN, OUT]
/** Reset this accumulator to zero value */
def reset(): Unit
/** Add value to this accumulator */
def add(v: IN): Unit
/** Merge another accumulator into this one */
def merge(other: AccumulatorV2[IN, OUT]): Unit
/** Get current accumulated value */
def value: OUT
/** Optional name for this accumulator */
def name: Option[String]
/** Unique ID for this accumulator */
def id: Long
/** Count of how many times add has been called */
def count: Long
/** Average of values added */
def avg: Double
/** Sum of values (for numeric accumulators) */
def sum: OUT
}Pre-defined accumulator implementations for common aggregation patterns.
/**
* Accumulator for long values
*/
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
override def isZero: Boolean
override def copy(): LongAccumulator
override def reset(): Unit
override def add(v: java.lang.Long): Unit
override def add(v: Long): Unit // Scala convenience method
override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit
override def value: java.lang.Long
override def count: Long
override def sum: java.lang.Long
override def avg: Double
}
/**
* Accumulator for double values
*/
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
override def isZero: Boolean
override def copy(): DoubleAccumulator
override def reset(): Unit
override def add(v: java.lang.Double): Unit
override def add(v: Double): Unit // Scala convenience method
override def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit
override def value: java.lang.Double
override def count: Long
override def sum: java.lang.Double
override def avg: Double
}
/**
* Accumulator for collecting objects into a list
*/
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def isZero: Boolean
override def copy(): CollectionAccumulator[T]
override def reset(): Unit
override def add(v: T): Unit
override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit
override def value: java.util.List[T]
}Accumulators are created through SparkContext methods with optional names for monitoring.
class SparkContext(config: SparkConf) {
/** Create long accumulator */
def longAccumulator(): LongAccumulator
def longAccumulator(name: String): LongAccumulator
/** Create double accumulator */
def doubleAccumulator(): DoubleAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
/** Create collection accumulator */
def collectionAccumulator[T](): CollectionAccumulator[T]
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
/** Register custom accumulator */
def register[T](accumulator: AccumulatorV2[T, T]): Unit
def register[T](accumulator: AccumulatorV2[T, T], name: String): Unit
}
// Java API
public class JavaSparkContext {
/** Create long accumulator */
public LongAccumulator longAccumulator()
public LongAccumulator longAccumulator(String name)
/** Create double accumulator */
public DoubleAccumulator doubleAccumulator()
public DoubleAccumulator doubleAccumulator(String name)
/** Create collection accumulator */
public <T> CollectionAccumulator<T> collectionAccumulator()
public <T> CollectionAccumulator<T> collectionAccumulator(String name)
}Create custom accumulators for domain-specific aggregation patterns.
/**
* Example: Accumulator for collecting statistics
*/
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
private var _count: Long = 0L
private var _sum: Double = 0.0
private var _min: Double = Double.MaxValue
private var _max: Double = Double.MinValue
private var _sumSquares: Double = 0.0
override def isZero: Boolean = _count == 0
override def copy(): StatsAccumulator = {
val newAcc = new StatsAccumulator
newAcc._count = this._count
newAcc._sum = this._sum
newAcc._min = this._min
newAcc._max = this._max
newAcc._sumSquares = this._sumSquares
newAcc
}
override def reset(): Unit = {
_count = 0L
_sum = 0.0
_min = Double.MaxValue
_max = Double.MinValue
_sumSquares = 0.0
}
override def add(v: Double): Unit = {
_count += 1
_sum += v
_min = math.min(_min, v)
_max = math.max(_max, v)
_sumSquares += v * v
}
override def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
other match {
case o: StatsAccumulator =>
if (o._count > 0) {
_count += o._count
_sum += o._sum
_min = math.min(_min, o._min)
_max = math.max(_max, o._max)
_sumSquares += o._sumSquares
}
}
}
override def value: (Long, Double, Double, Double, Double) = {
if (_count == 0) {
(0L, 0.0, 0.0, 0.0, 0.0)
} else {
val mean = _sum / _count
val variance = (_sumSquares / _count) - (mean * mean)
(_count, _sum, _min, _max, math.sqrt(variance))
}
}
}
/**
* Example: Set accumulator for collecting unique values
*/
class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
private val _set = mutable.Set.empty[T]
override def isZero: Boolean = _set.isEmpty
override def copy(): SetAccumulator[T] = {
val newAcc = new SetAccumulator[T]
newAcc._set ++= this._set
newAcc
}
override def reset(): Unit = _set.clear()
override def add(v: T): Unit = _set += v
override def merge(other: AccumulatorV2[T, Set[T]]): Unit = {
other match {
case o: SetAccumulator[T] => _set ++= o._set
}
}
override def value: Set[T] = _set.toSet
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))
// Create built-in accumulators
val errorCount = sc.longAccumulator("Error Count")
val processingTime = sc.doubleAccumulator("Total Processing Time")
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
// Create RDD
val data = sc.parallelize(1 to 1000)
// Use accumulators in transformations
val processed = data.map { x =>
val startTime = System.currentTimeMillis()
try {
if (x % 100 == 0) {
throw new RuntimeException(s"Simulated error for $x")
}
// Simulate processing
Thread.sleep(1)
val result = x * 2
val endTime = System.currentTimeMillis()
processingTime.add(endTime - startTime)
result
} catch {
case e: Exception =>
errorCount.add(1)
errorMessages.add(s"Error processing $x: ${e.getMessage}")
0 // Default value
}
}
// Trigger computation
val results = processed.collect()
// Access accumulator values (only on driver)
println(s"Total errors: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / (1000 - errorCount.value)} ms")
println(s"Error messages: ${errorMessages.value.asScala.mkString(", ")}")
// Custom accumulator example
val statsAcc = new StatsAccumulator()
sc.register(statsAcc, "Data Statistics")
val numbers = sc.parallelize(Array(1.0, 2.5, 3.7, 4.1, 5.9, 2.3, 7.8, 1.2))
numbers.foreach(statsAcc.add)
val (count, sum, min, max, stddev) = statsAcc.value
println(s"Count: $count, Sum: $sum, Min: $min, Max: $max, StdDev: $stddev")
// Set accumulator for unique values
val uniqueValues = new SetAccumulator[Int]()
sc.register(uniqueValues, "Unique Values")
val duplicatedData = sc.parallelize(Array(1, 2, 3, 2, 1, 4, 3, 5))
duplicatedData.foreach(uniqueValues.add)
println(s"Unique values: ${uniqueValues.value}")
sc.stop()Java Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.CollectionAccumulator;
import java.util.Arrays;
import java.util.List;
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("Java Accumulator Example")
);
// Create accumulators
LongAccumulator counter = sc.longAccumulator("Counter");
DoubleAccumulator sum = sc.doubleAccumulator("Sum");
CollectionAccumulator<String> logs = sc.collectionAccumulator("Logs");
// Create RDD
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data);
// Use accumulators
JavaRDD<Integer> processed = rdd.map(x -> {
counter.add(1);
sum.add(x.doubleValue());
logs.add("Processed: " + x);
return x * 2;
});
// Trigger action
List<Integer> results = processed.collect();
// Access values
System.out.println("Count: " + counter.value());
System.out.println("Sum: " + sum.value());
System.out.println("Logs: " + logs.value());
sc.close();// Good: Use accumulators in actions (guaranteed exactly-once)
val errorCount = sc.longAccumulator("Errors")
rdd.foreach { x =>
if (isInvalid(x)) errorCount.add(1)
}
// Caution: Accumulators in transformations may be called multiple times
val warningCount = sc.longAccumulator("Warnings")
val filtered = rdd.filter { x =>
if (isWarning(x)) {
warningCount.add(1) // May be incremented multiple times if RDD is recomputed
}
isValid(x)
}
// Good: Named accumulators for monitoring
val processedRecords = sc.longAccumulator("Processed Records")
val skippedRecords = sc.longAccumulator("Skipped Records")
// Good: Reset accumulators when reusing
errorCount.reset()val errorAcc = sc.collectionAccumulator[String]("Errors")
val processed = rdd.map { record =>
try {
processRecord(record)
} catch {
case e: Exception =>
errorAcc.add(s"Error processing $record: ${e.getMessage}")
null // or default value
}
}.filter(_ != null)Accumulators provide a powerful mechanism for aggregating information from distributed computations while maintaining fault tolerance and exactly-once semantics for actions.