Apache Spark Core provides the foundational execution engine and API for distributed data processing with RDDs, task scheduling, and cluster management.
Shared variables for collecting information from distributed tasks, supporting aggregation patterns like counters, sums, and custom aggregations across cluster nodes.
Abstract base class for all accumulators providing type-safe aggregation of values from distributed tasks.
/**
* Base class for accumulators that aggregate values across tasks
* @tparam IN input type (what gets added)
* @tparam OUT output type (what gets returned)
*/
abstract class AccumulatorV2[IN, OUT] extends Serializable {
/** Whether this accumulator is zero value */
def isZero: Boolean
/** Copy this accumulator */
def copy(): AccumulatorV2[IN, OUT]
/** Reset this accumulator to zero value */
def reset(): Unit
/** Add value to this accumulator */
def add(v: IN): Unit
/** Merge another accumulator into this one */
def merge(other: AccumulatorV2[IN, OUT]): Unit
/** Get current accumulated value */
def value: OUT
/** Optional name for this accumulator */
def name: Option[String]
/** Unique ID for this accumulator */
def id: Long
/** Count of how many times add has been called */
def count: Long
/** Average of values added */
def avg: Double
/** Sum of values (for numeric accumulators) */
def sum: OUT
}Pre-defined accumulator implementations for common aggregation patterns.
/**
* Accumulator for long values
*/
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
override def isZero: Boolean
override def copy(): LongAccumulator
override def reset(): Unit
override def add(v: java.lang.Long): Unit
override def add(v: Long): Unit // Scala convenience method
override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit
override def value: java.lang.Long
override def count: Long
override def sum: java.lang.Long
override def avg: Double
}
/**
* Accumulator for double values
*/
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
override def isZero: Boolean
override def copy(): DoubleAccumulator
override def reset(): Unit
override def add(v: java.lang.Double): Unit
override def add(v: Double): Unit // Scala convenience method
override def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit
override def value: java.lang.Double
override def count: Long
override def sum: java.lang.Double
override def avg: Double
}
/**
* Accumulator for collecting objects into a list
*/
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def isZero: Boolean
override def copy(): CollectionAccumulator[T]
override def reset(): Unit
override def add(v: T): Unit
override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit
override def value: java.util.List[T]
}Accumulators are created through SparkContext methods with optional names for monitoring.
class SparkContext(config: SparkConf) {
/** Create long accumulator */
def longAccumulator(): LongAccumulator
def longAccumulator(name: String): LongAccumulator
/** Create double accumulator */
def doubleAccumulator(): DoubleAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
/** Create collection accumulator */
def collectionAccumulator[T](): CollectionAccumulator[T]
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
/** Register custom accumulator */
def register[T](accumulator: AccumulatorV2[T, T]): Unit
def register[T](accumulator: AccumulatorV2[T, T], name: String): Unit
}
// Java API
public class JavaSparkContext {
/** Create long accumulator */
public LongAccumulator longAccumulator()
public LongAccumulator longAccumulator(String name)
/** Create double accumulator */
public DoubleAccumulator doubleAccumulator()
public DoubleAccumulator doubleAccumulator(String name)
/** Create collection accumulator */
public <T> CollectionAccumulator<T> collectionAccumulator()
public <T> CollectionAccumulator<T> collectionAccumulator(String name)
}Create custom accumulators for domain-specific aggregation patterns.
/**
* Example: Accumulator for collecting statistics
*/
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
private var _count: Long = 0L
private var _sum: Double = 0.0
private var _min: Double = Double.MaxValue
private var _max: Double = Double.MinValue
private var _sumSquares: Double = 0.0
override def isZero: Boolean = _count == 0
override def copy(): StatsAccumulator = {
val newAcc = new StatsAccumulator
newAcc._count = this._count
newAcc._sum = this._sum
newAcc._min = this._min
newAcc._max = this._max
newAcc._sumSquares = this._sumSquares
newAcc
}
override def reset(): Unit = {
_count = 0L
_sum = 0.0
_min = Double.MaxValue
_max = Double.MinValue
_sumSquares = 0.0
}
override def add(v: Double): Unit = {
_count += 1
_sum += v
_min = math.min(_min, v)
_max = math.max(_max, v)
_sumSquares += v * v
}
override def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
other match {
case o: StatsAccumulator =>
if (o._count > 0) {
_count += o._count
_sum += o._sum
_min = math.min(_min, o._min)
_max = math.max(_max, o._max)
_sumSquares += o._sumSquares
}
}
}
override def value: (Long, Double, Double, Double, Double) = {
if (_count == 0) {
(0L, 0.0, 0.0, 0.0, 0.0)
} else {
val mean = _sum / _count
val variance = (_sumSquares / _count) - (mean * mean)
(_count, _sum, _min, _max, math.sqrt(variance))
}
}
}
/**
* Example: Set accumulator for collecting unique values
*/
class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
private val _set = mutable.Set.empty[T]
override def isZero: Boolean = _set.isEmpty
override def copy(): SetAccumulator[T] = {
val newAcc = new SetAccumulator[T]
newAcc._set ++= this._set
newAcc
}
override def reset(): Unit = _set.clear()
override def add(v: T): Unit = _set += v
override def merge(other: AccumulatorV2[T, Set[T]]): Unit = {
other match {
case o: SetAccumulator[T] => _set ++= o._set
}
}
override def value: Set[T] = _set.toSet
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))
// Create built-in accumulators
val errorCount = sc.longAccumulator("Error Count")
val processingTime = sc.doubleAccumulator("Total Processing Time")
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
// Create RDD
val data = sc.parallelize(1 to 1000)
// Use accumulators in transformations
val processed = data.map { x =>
val startTime = System.currentTimeMillis()
try {
if (x % 100 == 0) {
throw new RuntimeException(s"Simulated error for $x")
}
// Simulate processing
Thread.sleep(1)
val result = x * 2
val endTime = System.currentTimeMillis()
processingTime.add(endTime - startTime)
result
} catch {
case e: Exception =>
errorCount.add(1)
errorMessages.add(s"Error processing $x: ${e.getMessage}")
0 // Default value
}
}
// Trigger computation
val results = processed.collect()
// Access accumulator values (only on driver)
println(s"Total errors: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / (1000 - errorCount.value)} ms")
println(s"Error messages: ${errorMessages.value.asScala.mkString(", ")}")
// Custom accumulator example
val statsAcc = new StatsAccumulator()
sc.register(statsAcc, "Data Statistics")
val numbers = sc.parallelize(Array(1.0, 2.5, 3.7, 4.1, 5.9, 2.3, 7.8, 1.2))
numbers.foreach(statsAcc.add)
val (count, sum, min, max, stddev) = statsAcc.value
println(s"Count: $count, Sum: $sum, Min: $min, Max: $max, StdDev: $stddev")
// Set accumulator for unique values
val uniqueValues = new SetAccumulator[Int]()
sc.register(uniqueValues, "Unique Values")
val duplicatedData = sc.parallelize(Array(1, 2, 3, 2, 1, 4, 3, 5))
duplicatedData.foreach(uniqueValues.add)
println(s"Unique values: ${uniqueValues.value}")
sc.stop()Java Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.CollectionAccumulator;
import java.util.Arrays;
import java.util.List;
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("Java Accumulator Example")
);
// Create accumulators
LongAccumulator counter = sc.longAccumulator("Counter");
DoubleAccumulator sum = sc.doubleAccumulator("Sum");
CollectionAccumulator<String> logs = sc.collectionAccumulator("Logs");
// Create RDD
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data);
// Use accumulators
JavaRDD<Integer> processed = rdd.map(x -> {
counter.add(1);
sum.add(x.doubleValue());
logs.add("Processed: " + x);
return x * 2;
});
// Trigger action
List<Integer> results = processed.collect();
// Access values
System.out.println("Count: " + counter.value());
System.out.println("Sum: " + sum.value());
System.out.println("Logs: " + logs.value());
sc.close();// Good: Use accumulators in actions (guaranteed exactly-once)
val errorCount = sc.longAccumulator("Errors")
rdd.foreach { x =>
if (isInvalid(x)) errorCount.add(1)
}
// Caution: Accumulators in transformations may be called multiple times
val warningCount = sc.longAccumulator("Warnings")
val filtered = rdd.filter { x =>
if (isWarning(x)) {
warningCount.add(1) // May be incremented multiple times if RDD is recomputed
}
isValid(x)
}
// Good: Named accumulators for monitoring
val processedRecords = sc.longAccumulator("Processed Records")
val skippedRecords = sc.longAccumulator("Skipped Records")
// Good: Reset accumulators when reusing
errorCount.reset()val errorAcc = sc.collectionAccumulator[String]("Errors")
val processed = rdd.map { record =>
try {
processRecord(record)
} catch {
case e: Exception =>
errorAcc.add(s"Error processing $record: ${e.getMessage}")
null // or default value
}
}.filter(_ != null)Accumulators provide a powerful mechanism for aggregating information from distributed computations while maintaining fault tolerance and exactly-once semantics for actions.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-13