or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md
tile.json

accumulators.mddocs/

Accumulators

Shared variables for collecting information from distributed tasks, supporting aggregation patterns like counters, sums, and custom aggregations across cluster nodes.

Capabilities

AccumulatorV2 Base Class

Abstract base class for all accumulators providing type-safe aggregation of values from distributed tasks.

/**
 * Base class for accumulators that aggregate values across tasks
 * @tparam IN input type (what gets added)
 * @tparam OUT output type (what gets returned)
 */
abstract class AccumulatorV2[IN, OUT] extends Serializable {
  /** Whether this accumulator is zero value */
  def isZero: Boolean
  
  /** Copy this accumulator */
  def copy(): AccumulatorV2[IN, OUT]
  
  /** Reset this accumulator to zero value */  
  def reset(): Unit
  
  /** Add value to this accumulator */
  def add(v: IN): Unit
  
  /** Merge another accumulator into this one */
  def merge(other: AccumulatorV2[IN, OUT]): Unit
  
  /** Get current accumulated value */
  def value: OUT
  
  /** Optional name for this accumulator */
  def name: Option[String]
  
  /** Unique ID for this accumulator */
  def id: Long
  
  /** Count of how many times add has been called */
  def count: Long
  
  /** Average of values added */
  def avg: Double
  
  /** Sum of values (for numeric accumulators) */
  def sum: OUT
}

Built-in Accumulator Types

Pre-defined accumulator implementations for common aggregation patterns.

/**
 * Accumulator for long values
 */
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
  override def isZero: Boolean
  override def copy(): LongAccumulator  
  override def reset(): Unit
  override def add(v: java.lang.Long): Unit
  override def add(v: Long): Unit // Scala convenience method
  override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit
  override def value: java.lang.Long
  override def count: Long
  override def sum: java.lang.Long
  override def avg: Double
}

/**
 * Accumulator for double values  
 */
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
  override def isZero: Boolean
  override def copy(): DoubleAccumulator
  override def reset(): Unit  
  override def add(v: java.lang.Double): Unit
  override def add(v: Double): Unit // Scala convenience method
  override def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit
  override def value: java.lang.Double
  override def count: Long
  override def sum: java.lang.Double  
  override def avg: Double
}

/**
 * Accumulator for collecting objects into a list
 */
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
  override def isZero: Boolean
  override def copy(): CollectionAccumulator[T]
  override def reset(): Unit
  override def add(v: T): Unit
  override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit
  override def value: java.util.List[T]
}

Creating Accumulators

Accumulators are created through SparkContext methods with optional names for monitoring.

class SparkContext(config: SparkConf) {
  /** Create long accumulator */
  def longAccumulator(): LongAccumulator
  def longAccumulator(name: String): LongAccumulator
  
  /** Create double accumulator */
  def doubleAccumulator(): DoubleAccumulator  
  def doubleAccumulator(name: String): DoubleAccumulator
  
  /** Create collection accumulator */
  def collectionAccumulator[T](): CollectionAccumulator[T]
  def collectionAccumulator[T](name: String): CollectionAccumulator[T]
  
  /** Register custom accumulator */
  def register[T](accumulator: AccumulatorV2[T, T]): Unit
  def register[T](accumulator: AccumulatorV2[T, T], name: String): Unit
}

// Java API
public class JavaSparkContext {
  /** Create long accumulator */
  public LongAccumulator longAccumulator()
  public LongAccumulator longAccumulator(String name)
  
  /** Create double accumulator */
  public DoubleAccumulator doubleAccumulator()
  public DoubleAccumulator doubleAccumulator(String name)
  
  /** Create collection accumulator */
  public <T> CollectionAccumulator<T> collectionAccumulator()
  public <T> CollectionAccumulator<T> collectionAccumulator(String name)
}

Custom Accumulators

Create custom accumulators for domain-specific aggregation patterns.

/**
 * Example: Accumulator for collecting statistics
 */
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
  private var _count: Long = 0L
  private var _sum: Double = 0.0
  private var _min: Double = Double.MaxValue
  private var _max: Double = Double.MinValue
  private var _sumSquares: Double = 0.0
  
  override def isZero: Boolean = _count == 0
  
  override def copy(): StatsAccumulator = {
    val newAcc = new StatsAccumulator
    newAcc._count = this._count
    newAcc._sum = this._sum
    newAcc._min = this._min
    newAcc._max = this._max
    newAcc._sumSquares = this._sumSquares
    newAcc
  }
  
  override def reset(): Unit = {
    _count = 0L
    _sum = 0.0
    _min = Double.MaxValue
    _max = Double.MinValue
    _sumSquares = 0.0
  }
  
  override def add(v: Double): Unit = {
    _count += 1
    _sum += v
    _min = math.min(_min, v)
    _max = math.max(_max, v)
    _sumSquares += v * v
  }
  
  override def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
    other match {
      case o: StatsAccumulator =>
        if (o._count > 0) {
          _count += o._count
          _sum += o._sum
          _min = math.min(_min, o._min)
          _max = math.max(_max, o._max)
          _sumSquares += o._sumSquares
        }
    }
  }
  
  override def value: (Long, Double, Double, Double, Double) = {
    if (_count == 0) {
      (0L, 0.0, 0.0, 0.0, 0.0)
    } else {
      val mean = _sum / _count
      val variance = (_sumSquares / _count) - (mean * mean)
      (_count, _sum, _min, _max, math.sqrt(variance))
    }
  }
}

/**
 * Example: Set accumulator for collecting unique values
 */
class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
  private val _set = mutable.Set.empty[T]
  
  override def isZero: Boolean = _set.isEmpty
  
  override def copy(): SetAccumulator[T] = {
    val newAcc = new SetAccumulator[T]
    newAcc._set ++= this._set
    newAcc
  }
  
  override def reset(): Unit = _set.clear()
  
  override def add(v: T): Unit = _set += v
  
  override def merge(other: AccumulatorV2[T, Set[T]]): Unit = {
    other match {
      case o: SetAccumulator[T] => _set ++= o._set
    }
  }
  
  override def value: Set[T] = _set.toSet
}

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))

// Create built-in accumulators
val errorCount = sc.longAccumulator("Error Count")
val processingTime = sc.doubleAccumulator("Total Processing Time") 
val errorMessages = sc.collectionAccumulator[String]("Error Messages")

// Create RDD
val data = sc.parallelize(1 to 1000)

// Use accumulators in transformations
val processed = data.map { x =>
  val startTime = System.currentTimeMillis()
  
  try {
    if (x % 100 == 0) {
      throw new RuntimeException(s"Simulated error for $x")
    }
    
    // Simulate processing
    Thread.sleep(1)
    val result = x * 2
    
    val endTime = System.currentTimeMillis()
    processingTime.add(endTime - startTime)
    
    result
  } catch {
    case e: Exception =>
      errorCount.add(1)
      errorMessages.add(s"Error processing $x: ${e.getMessage}")
      0 // Default value
  }
}

// Trigger computation
val results = processed.collect()

// Access accumulator values (only on driver)
println(s"Total errors: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / (1000 - errorCount.value)} ms")
println(s"Error messages: ${errorMessages.value.asScala.mkString(", ")}")

// Custom accumulator example
val statsAcc = new StatsAccumulator()
sc.register(statsAcc, "Data Statistics")

val numbers = sc.parallelize(Array(1.0, 2.5, 3.7, 4.1, 5.9, 2.3, 7.8, 1.2))
numbers.foreach(statsAcc.add)

val (count, sum, min, max, stddev) = statsAcc.value
println(s"Count: $count, Sum: $sum, Min: $min, Max: $max, StdDev: $stddev")

// Set accumulator for unique values
val uniqueValues = new SetAccumulator[Int]()
sc.register(uniqueValues, "Unique Values")

val duplicatedData = sc.parallelize(Array(1, 2, 3, 2, 1, 4, 3, 5))
duplicatedData.foreach(uniqueValues.add)

println(s"Unique values: ${uniqueValues.value}")

sc.stop()

Java Examples:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.CollectionAccumulator;

import java.util.Arrays;
import java.util.List;

JavaSparkContext sc = new JavaSparkContext(
    new SparkConf().setAppName("Java Accumulator Example")
);

// Create accumulators
LongAccumulator counter = sc.longAccumulator("Counter");
DoubleAccumulator sum = sc.doubleAccumulator("Sum");
CollectionAccumulator<String> logs = sc.collectionAccumulator("Logs");

// Create RDD
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data);

// Use accumulators
JavaRDD<Integer> processed = rdd.map(x -> {
    counter.add(1);
    sum.add(x.doubleValue());
    logs.add("Processed: " + x);
    return x * 2;
});

// Trigger action
List<Integer> results = processed.collect();

// Access values
System.out.println("Count: " + counter.value());
System.out.println("Sum: " + sum.value());
System.out.println("Logs: " + logs.value());

sc.close();

Best Practices

When to Use Accumulators

  • Debugging: Count errors, null values, or invalid records
  • Monitoring: Track processing metrics and performance
  • Statistics: Collect summary statistics during processing
  • Logging: Gather diagnostic information from tasks

Important Considerations

// Good: Use accumulators in actions (guaranteed exactly-once)
val errorCount = sc.longAccumulator("Errors")
rdd.foreach { x =>
  if (isInvalid(x)) errorCount.add(1)
}

// Caution: Accumulators in transformations may be called multiple times
val warningCount = sc.longAccumulator("Warnings")
val filtered = rdd.filter { x =>
  if (isWarning(x)) {
    warningCount.add(1) // May be incremented multiple times if RDD is recomputed
  }
  isValid(x)
}

// Good: Named accumulators for monitoring
val processedRecords = sc.longAccumulator("Processed Records")
val skippedRecords = sc.longAccumulator("Skipped Records")

// Good: Reset accumulators when reusing
errorCount.reset()

Performance Tips

  • Accumulators have minimal performance overhead
  • Avoid accumulating large collections; consider sampling
  • Use appropriate accumulator types for your data
  • Name accumulators for easier monitoring in Spark UI

Error Handling

val errorAcc = sc.collectionAccumulator[String]("Errors")

val processed = rdd.map { record =>
  try {
    processRecord(record)
  } catch {
    case e: Exception =>
      errorAcc.add(s"Error processing $record: ${e.getMessage}")
      null // or default value
  }
}.filter(_ != null)

Accumulators provide a powerful mechanism for aggregating information from distributed computations while maintaining fault tolerance and exactly-once semantics for actions.