CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

broadcast-accumulators.mddocs/

Broadcast Variables and Accumulators

Spark provides two types of shared variables for efficient distributed computation: broadcast variables for read-only data distribution and accumulators for aggregating information across tasks.

Broadcast Variables

Broadcast variables allow keeping a read-only variable cached on each machine rather than shipping a copy with each task.

Broadcast Class

abstract class Broadcast[T](val id: Long) {
  def value: T
  def unpersist(): Unit
  def unpersist(blocking: Boolean): Unit  
  def destroy(): Unit
  def toString: String
}

Creating and Using Broadcast Variables

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example").setMaster("local[*]"))

// Create a lookup table that will be used across many tasks
val lookupTable = Map(
  "US" -> "United States",
  "UK" -> "United Kingdom", 
  "DE" -> "Germany",
  "FR" -> "France",
  "JP" -> "Japan"
)

// Broadcast the lookup table
val broadcastLookup = sc.broadcast(lookupTable)

// Use the broadcast variable in transformations
val countryCodes = sc.parallelize(Seq("US", "UK", "DE", "UNKNOWN"))
val countryNames = countryCodes.map { code =>
  val lookup = broadcastLookup.value  // Access broadcast value
  lookup.getOrElse(code, "Unknown Country")
}

// Collect results
val results = countryNames.collect()
// Results: Array("United States", "United Kingdom", "Germany", "Unknown Country")

// Clean up broadcast variable when done
broadcastLookup.unpersist()  // Remove from executors' memory
broadcastLookup.destroy()    // Remove all data and metadata

Large Dataset Join Optimization

// Traditional join (can be expensive for large datasets)
val largeRDD = sc.textFile("large_dataset.txt").map(parseRecord)  
val smallRDD = sc.textFile("small_lookup.txt").map(parseLookup)
val traditionalJoin = largeRDD.join(smallRDD)  // Expensive shuffle

// Broadcast join optimization (when one dataset is small)
val smallDataset: Map[String, LookupInfo] = smallRDD.collectAsMap()
val broadcastSmall = sc.broadcast(smallDataset)

val broadcastJoin = largeRDD.map { case (key, value) =>
  val lookupInfo = broadcastSmall.value.get(key)
  (key, (value, lookupInfo))
}

Configuration Broadcast Pattern

case class AppConfig(
  apiEndpoint: String,
  timeout: Int,
  retries: Int,
  features: Map[String, Boolean]
)

val config = AppConfig(
  apiEndpoint = "https://api.example.com",
  timeout = 30000,
  retries = 3,
  features = Map("feature_a" -> true, "feature_b" -> false)
)

val broadcastConfig = sc.broadcast(config)

// Use configuration in transformations
val processedData = inputRDD.mapPartitions { partition =>
  val cfg = broadcastConfig.value
  val apiClient = new ApiClient(cfg.apiEndpoint, cfg.timeout, cfg.retries)
  
  partition.map { record =>
    if (cfg.features("feature_a")) {
      processWithFeatureA(record, apiClient)
    } else {
      processStandard(record, apiClient)
    }
  }
}

Accumulators

Accumulators are variables that can only be "added" to through associative and commutative operations, making them suitable for implementing counters and sums.

Legacy Accumulator (Deprecated)

class Accumulator[T](initialValue: T, param: AccumulatorParam[T]) {
  def +=(term: T): Unit
  def add(term: T): Unit  
  def value: T           // Only valid on driver
  def setValue(newValue: T): Unit
}

class Accumulable[R, T](initialValue: R, param: AccumulableParam[R, T]) {
  def +=(term: T): Unit
  def add(term: T): Unit
  def value: R
  def setValue(newValue: R): Unit
}

AccumulatorV2 (Current API)

abstract class AccumulatorV2[IN, OUT] {
  def isZero: Boolean
  def copy(): AccumulatorV2[IN, OUT]
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: AccumulatorV2[IN, OUT]): Unit
  def value: OUT
  def name: Option[String]
  def id: Long
}

Built-in Accumulator Types

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
  def add(v: Long): Unit
  def add(v: java.lang.Long): Unit
  def sum: Long
  def count: Long
  def avg: Double
}

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
  def add(v: Double): Unit  
  def add(v: java.lang.Double): Unit
  def sum: Double
  def count: Long
  def avg: Double
}

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
  def add(v: T): Unit
  def value: java.util.List[T]
}

Using Built-in Accumulators

val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example").setMaster("local[*]"))

// Create accumulators
val errorCount = sc.longAccumulator("Error Count")
val processingTime = sc.doubleAccumulator("Total Processing Time")
val errorMessages = sc.collectionAccumulator[String]("Error Messages")

val data = sc.parallelize(1 to 1000)

// Use accumulators in transformations
val results = data.map { number =>
  val startTime = System.currentTimeMillis()
  
  try {
    if (number % 100 == 0) {
      throw new RuntimeException(s"Simulated error for number $number")
    }
    
    val result = complexProcessing(number)
    val elapsed = System.currentTimeMillis() - startTime
    processingTime.add(elapsed.toDouble)
    
    result
  } catch {
    case e: Exception =>
      errorCount.add(1)
      errorMessages.add(s"Error processing $number: ${e.getMessage}")
      -1  // Error sentinel value
  }
}

// Trigger computation
val processedResults = results.filter(_ != -1).collect()

// Access accumulator values (only on driver)
println(s"Successfully processed: ${processedResults.length}")
println(s"Errors encountered: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / processedResults.length}ms")
println(s"Error messages: ${errorMessages.value.asScala.take(5)}")

Custom Accumulators

import scala.collection.mutable

// Custom accumulator for collecting statistics
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
  private var _count: Long = 0
  private var _sum: Double = 0.0
  private var _sumSquares: Double = 0.0
  private var _min: Double = Double.MaxValue
  private var _max: Double = Double.MinValue
  
  def isZero: Boolean = _count == 0
  
  def copy(): StatsAccumulator = {
    val newAcc = new StatsAccumulator
    newAcc._count = _count
    newAcc._sum = _sum
    newAcc._sumSquares = _sumSquares
    newAcc._min = _min
    newAcc._max = _max
    newAcc
  }
  
  def reset(): Unit = {
    _count = 0
    _sum = 0.0
    _sumSquares = 0.0
    _min = Double.MaxValue
    _max = Double.MinValue
  }
  
  def add(v: Double): Unit = {
    _count += 1
    _sum += v
    _sumSquares += v * v
    _min = math.min(_min, v)
    _max = math.max(_max, v)
  }
  
  def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
    other match {
      case o: StatsAccumulator =>
        _count += o._count
        _sum += o._sum
        _sumSquares += o._sumSquares
        _min = math.min(_min, o._min)
        _max = math.max(_max, o._max)
    }
  }
  
  def value: (Long, Double, Double, Double, Double) = {
    if (_count == 0) (0, 0.0, 0.0, 0.0, 0.0)
    else {
      val mean = _sum / _count
      val variance = (_sumSquares / _count) - (mean * mean)
      (count, _sum, mean, variance, _min, _max)
    }
  }
}

// Register and use custom accumulator  
val statsAcc = new StatsAccumulator
sc.register(statsAcc, "Statistics")

val numbers = sc.parallelize(Seq.fill(1000)(scala.util.Random.nextGaussian() * 100 + 50))
numbers.foreach(statsAcc.add)

val (count, sum, mean, variance, min, max) = statsAcc.value
println(f"Count: $count, Sum: $sum%.2f, Mean: $mean%.2f, Variance: $variance%.2f")
println(f"Min: $min%.2f, Max: $max%.2f")

Histogram Accumulator

class HistogramAccumulator(val buckets: Array[Double]) extends AccumulatorV2[Double, Array[Long]] {
  require(buckets.sorted.sameElements(buckets), "Buckets must be sorted")
  
  private val _counts = Array.fill(buckets.length + 1)(0L)
  
  def isZero: Boolean = _counts.forall(_ == 0)
  
  def copy(): HistogramAccumulator = {
    val newAcc = new HistogramAccumulator(buckets)
    System.arraycopy(_counts, 0, newAcc._counts, 0, _counts.length)
    newAcc
  }
  
  def reset(): Unit = {
    java.util.Arrays.fill(_counts, 0L)
  }
  
  def add(v: Double): Unit = {
    val bucketIndex = java.util.Arrays.binarySearch(buckets, v)
    val index = if (bucketIndex >= 0) bucketIndex else -bucketIndex - 1
    _counts(index) += 1
  }
  
  def merge(other: AccumulatorV2[Double, Array[Long]]): Unit = {
    other match {
      case o: HistogramAccumulator =>
        for (i <- _counts.indices) {
          _counts(i) += o._counts(i)
        }
    }
  }
  
  def value: Array[Long] = _counts.clone()
}

// Usage
val histogramBuckets = Array(0.0, 25.0, 50.0, 75.0, 100.0)
val histogramAcc = new HistogramAccumulator(histogramBuckets)
sc.register(histogramAcc, "Value Histogram")

val values = sc.parallelize(Seq.fill(10000)(scala.util.Random.nextDouble() * 100))
values.foreach(histogramAcc.add)

val histogram = histogramAcc.value
println("Histogram buckets and counts:")
println(s"< ${histogramBuckets(0)}: ${histogram(0)}")
for (i <- histogramBuckets.indices.init) {
  println(s"${histogramBuckets(i)} - ${histogramBuckets(i+1)}: ${histogram(i+1)}")
}
println(s">= ${histogramBuckets.last}: ${histogram.last}")

Advanced Patterns

Distributed Cache with Broadcast

import scala.collection.mutable

class DistributedCache[K, V](loadFunction: K => V) extends Serializable {
  @transient private lazy val cache = mutable.Map[K, V]()
  
  def get(key: K): V = {
    cache.getOrElseUpdate(key, loadFunction(key))
  }
}

// Broadcast the cache instance
val databaseCache = new DistributedCache[String, DatabaseRecord] { key =>
  // This will be called once per executor per key
  loadFromDatabase(key)
}
val broadcastCache = sc.broadcast(databaseCache)

// Use across multiple operations
val enrichedData1 = rdd1.map { record =>
  val enrichment = broadcastCache.value.get(record.key)
  record.copy(metadata = enrichment)
}

val enrichedData2 = rdd2.map { record =>
  val enrichment = broadcastCache.value.get(record.foreignKey)
  record.copy(additionalInfo = enrichment)
}

Multi-level Metrics Collection

case class TaskMetrics(
  processedRecords: Long = 0,
  errorRecords: Long = 0,
  processingTimeMs: Long = 0,
  cacheHits: Long = 0,
  cacheMisses: Long = 0
) {
  def +(other: TaskMetrics): TaskMetrics = TaskMetrics(
    processedRecords + other.processedRecords,
    errorRecords + other.errorRecords,
    processingTimeMs + other.processingTimeMs,
    cacheHits + other.cacheHits,
    cacheMisses + other.cacheMisses
  )
}

class TaskMetricsAccumulator extends AccumulatorV2[TaskMetrics, TaskMetrics] {
  private var _metrics = TaskMetrics()
  
  def isZero: Boolean = _metrics == TaskMetrics()
  def copy(): TaskMetricsAccumulator = {
    val newAcc = new TaskMetricsAccumulator
    newAcc._metrics = _metrics
    newAcc
  }
  def reset(): Unit = _metrics = TaskMetrics()
  def add(v: TaskMetrics): Unit = _metrics = _metrics + v
  def merge(other: AccumulatorV2[TaskMetrics, TaskMetrics]): Unit = {
    other match {
      case o: TaskMetricsAccumulator => _metrics = _metrics + o._metrics
    }
  }
  def value: TaskMetrics = _metrics
}

// Usage in a processing pipeline
val metricsAcc = new TaskMetricsAccumulator
sc.register(metricsAcc, "Pipeline Metrics")

val results = inputRDD.mapPartitions { partition =>
  var partitionMetrics = TaskMetrics()
  val cache = mutable.Map[String, Any]()
  
  val processedPartition = partition.map { record =>
    val startTime = System.currentTimeMillis()
    
    try {
      // Simulate cache lookup
      val cacheKey = record.getCacheKey
      val cachedValue = cache.get(cacheKey)
      
      if (cachedValue.isDefined) {
        partitionMetrics = partitionMetrics.copy(cacheHits = partitionMetrics.cacheHits + 1)
      } else {
        partitionMetrics = partitionMetrics.copy(cacheMisses = partitionMetrics.cacheMisses + 1)
        cache(cacheKey) = computeValue(record)
      }
      
      val processed = processRecord(record)
      val elapsed = System.currentTimeMillis() - startTime
      
      partitionMetrics = partitionMetrics.copy(
        processedRecords = partitionMetrics.processedRecords + 1,
        processingTimeMs = partitionMetrics.processingTimeMs + elapsed
      )
      
      processed
    } catch {
      case _: Exception =>
        partitionMetrics = partitionMetrics.copy(errorRecords = partitionMetrics.errorRecords + 1)
        null
    }
  }.filter(_ != null)
  
  // Add partition metrics to global accumulator
  metricsAcc.add(partitionMetrics)
  processedPartition
}

// Trigger computation and get metrics
val finalResults = results.collect()
val finalMetrics = metricsAcc.value

println(s"Processing Summary:")
println(s"  Processed: ${finalMetrics.processedRecords}")
println(s"  Errors: ${finalMetrics.errorRecords}")
println(s"  Total time: ${finalMetrics.processingTimeMs}ms")
println(s"  Avg time per record: ${finalMetrics.processingTimeMs.toDouble / finalMetrics.processedRecords}ms")
println(s"  Cache hit rate: ${finalMetrics.cacheHits.toDouble / (finalMetrics.cacheHits + finalMetrics.cacheMisses) * 100}%")

Best Practices

Broadcast Variables

  • Only broadcast read-only data
  • Broadcast small to medium-sized datasets (typically < 2GB)
  • Clean up broadcast variables when no longer needed
  • Use broadcast joins for small lookup tables
  • Consider using broadcast for configuration objects

Accumulators

  • Only use accumulators for metrics and debugging information
  • Don't rely on accumulator values for program logic (values may be inconsistent)
  • Accumulators are only guaranteed to be updated once per task for actions
  • Register accumulators with meaningful names for monitoring
  • Consider using custom accumulators for complex aggregations

Memory Management

// Clean up resources properly
try {
  val broadcastData = sc.broadcast(largeData)
  val metrics = sc.longAccumulator("Processing Count")
  
  // Use broadcast and accumulator
  val results = processWithBroadcastAndAccumulator(inputRDD, broadcastData, metrics)
  
  // Collect results
  results.collect()
  
  println(s"Processed ${metrics.value} records")
  
} finally {
  // Always clean up
  broadcastData.unpersist()
  broadcastData.destroy()
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json