Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
Spark provides two types of shared variables for efficient distributed computation: broadcast variables for read-only data distribution and accumulators for aggregating information across tasks.
Broadcast variables allow keeping a read-only variable cached on each machine rather than shipping a copy with each task.
abstract class Broadcast[T](val id: Long) {
def value: T
def unpersist(): Unit
def unpersist(blocking: Boolean): Unit
def destroy(): Unit
def toString: String
}import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example").setMaster("local[*]"))
// Create a lookup table that will be used across many tasks
val lookupTable = Map(
"US" -> "United States",
"UK" -> "United Kingdom",
"DE" -> "Germany",
"FR" -> "France",
"JP" -> "Japan"
)
// Broadcast the lookup table
val broadcastLookup = sc.broadcast(lookupTable)
// Use the broadcast variable in transformations
val countryCodes = sc.parallelize(Seq("US", "UK", "DE", "UNKNOWN"))
val countryNames = countryCodes.map { code =>
val lookup = broadcastLookup.value // Access broadcast value
lookup.getOrElse(code, "Unknown Country")
}
// Collect results
val results = countryNames.collect()
// Results: Array("United States", "United Kingdom", "Germany", "Unknown Country")
// Clean up broadcast variable when done
broadcastLookup.unpersist() // Remove from executors' memory
broadcastLookup.destroy() // Remove all data and metadata// Traditional join (can be expensive for large datasets)
val largeRDD = sc.textFile("large_dataset.txt").map(parseRecord)
val smallRDD = sc.textFile("small_lookup.txt").map(parseLookup)
val traditionalJoin = largeRDD.join(smallRDD) // Expensive shuffle
// Broadcast join optimization (when one dataset is small)
val smallDataset: Map[String, LookupInfo] = smallRDD.collectAsMap()
val broadcastSmall = sc.broadcast(smallDataset)
val broadcastJoin = largeRDD.map { case (key, value) =>
val lookupInfo = broadcastSmall.value.get(key)
(key, (value, lookupInfo))
}case class AppConfig(
apiEndpoint: String,
timeout: Int,
retries: Int,
features: Map[String, Boolean]
)
val config = AppConfig(
apiEndpoint = "https://api.example.com",
timeout = 30000,
retries = 3,
features = Map("feature_a" -> true, "feature_b" -> false)
)
val broadcastConfig = sc.broadcast(config)
// Use configuration in transformations
val processedData = inputRDD.mapPartitions { partition =>
val cfg = broadcastConfig.value
val apiClient = new ApiClient(cfg.apiEndpoint, cfg.timeout, cfg.retries)
partition.map { record =>
if (cfg.features("feature_a")) {
processWithFeatureA(record, apiClient)
} else {
processStandard(record, apiClient)
}
}
}Accumulators are variables that can only be "added" to through associative and commutative operations, making them suitable for implementing counters and sums.
class Accumulator[T](initialValue: T, param: AccumulatorParam[T]) {
def +=(term: T): Unit
def add(term: T): Unit
def value: T // Only valid on driver
def setValue(newValue: T): Unit
}
class Accumulable[R, T](initialValue: R, param: AccumulableParam[R, T]) {
def +=(term: T): Unit
def add(term: T): Unit
def value: R
def setValue(newValue: R): Unit
}abstract class AccumulatorV2[IN, OUT] {
def isZero: Boolean
def copy(): AccumulatorV2[IN, OUT]
def reset(): Unit
def add(v: IN): Unit
def merge(other: AccumulatorV2[IN, OUT]): Unit
def value: OUT
def name: Option[String]
def id: Long
}class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
def add(v: Long): Unit
def add(v: java.lang.Long): Unit
def sum: Long
def count: Long
def avg: Double
}
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
def add(v: Double): Unit
def add(v: java.lang.Double): Unit
def sum: Double
def count: Long
def avg: Double
}
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
def add(v: T): Unit
def value: java.util.List[T]
}val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example").setMaster("local[*]"))
// Create accumulators
val errorCount = sc.longAccumulator("Error Count")
val processingTime = sc.doubleAccumulator("Total Processing Time")
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
val data = sc.parallelize(1 to 1000)
// Use accumulators in transformations
val results = data.map { number =>
val startTime = System.currentTimeMillis()
try {
if (number % 100 == 0) {
throw new RuntimeException(s"Simulated error for number $number")
}
val result = complexProcessing(number)
val elapsed = System.currentTimeMillis() - startTime
processingTime.add(elapsed.toDouble)
result
} catch {
case e: Exception =>
errorCount.add(1)
errorMessages.add(s"Error processing $number: ${e.getMessage}")
-1 // Error sentinel value
}
}
// Trigger computation
val processedResults = results.filter(_ != -1).collect()
// Access accumulator values (only on driver)
println(s"Successfully processed: ${processedResults.length}")
println(s"Errors encountered: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / processedResults.length}ms")
println(s"Error messages: ${errorMessages.value.asScala.take(5)}")import scala.collection.mutable
// Custom accumulator for collecting statistics
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
private var _count: Long = 0
private var _sum: Double = 0.0
private var _sumSquares: Double = 0.0
private var _min: Double = Double.MaxValue
private var _max: Double = Double.MinValue
def isZero: Boolean = _count == 0
def copy(): StatsAccumulator = {
val newAcc = new StatsAccumulator
newAcc._count = _count
newAcc._sum = _sum
newAcc._sumSquares = _sumSquares
newAcc._min = _min
newAcc._max = _max
newAcc
}
def reset(): Unit = {
_count = 0
_sum = 0.0
_sumSquares = 0.0
_min = Double.MaxValue
_max = Double.MinValue
}
def add(v: Double): Unit = {
_count += 1
_sum += v
_sumSquares += v * v
_min = math.min(_min, v)
_max = math.max(_max, v)
}
def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
other match {
case o: StatsAccumulator =>
_count += o._count
_sum += o._sum
_sumSquares += o._sumSquares
_min = math.min(_min, o._min)
_max = math.max(_max, o._max)
}
}
def value: (Long, Double, Double, Double, Double) = {
if (_count == 0) (0, 0.0, 0.0, 0.0, 0.0)
else {
val mean = _sum / _count
val variance = (_sumSquares / _count) - (mean * mean)
(count, _sum, mean, variance, _min, _max)
}
}
}
// Register and use custom accumulator
val statsAcc = new StatsAccumulator
sc.register(statsAcc, "Statistics")
val numbers = sc.parallelize(Seq.fill(1000)(scala.util.Random.nextGaussian() * 100 + 50))
numbers.foreach(statsAcc.add)
val (count, sum, mean, variance, min, max) = statsAcc.value
println(f"Count: $count, Sum: $sum%.2f, Mean: $mean%.2f, Variance: $variance%.2f")
println(f"Min: $min%.2f, Max: $max%.2f")class HistogramAccumulator(val buckets: Array[Double]) extends AccumulatorV2[Double, Array[Long]] {
require(buckets.sorted.sameElements(buckets), "Buckets must be sorted")
private val _counts = Array.fill(buckets.length + 1)(0L)
def isZero: Boolean = _counts.forall(_ == 0)
def copy(): HistogramAccumulator = {
val newAcc = new HistogramAccumulator(buckets)
System.arraycopy(_counts, 0, newAcc._counts, 0, _counts.length)
newAcc
}
def reset(): Unit = {
java.util.Arrays.fill(_counts, 0L)
}
def add(v: Double): Unit = {
val bucketIndex = java.util.Arrays.binarySearch(buckets, v)
val index = if (bucketIndex >= 0) bucketIndex else -bucketIndex - 1
_counts(index) += 1
}
def merge(other: AccumulatorV2[Double, Array[Long]]): Unit = {
other match {
case o: HistogramAccumulator =>
for (i <- _counts.indices) {
_counts(i) += o._counts(i)
}
}
}
def value: Array[Long] = _counts.clone()
}
// Usage
val histogramBuckets = Array(0.0, 25.0, 50.0, 75.0, 100.0)
val histogramAcc = new HistogramAccumulator(histogramBuckets)
sc.register(histogramAcc, "Value Histogram")
val values = sc.parallelize(Seq.fill(10000)(scala.util.Random.nextDouble() * 100))
values.foreach(histogramAcc.add)
val histogram = histogramAcc.value
println("Histogram buckets and counts:")
println(s"< ${histogramBuckets(0)}: ${histogram(0)}")
for (i <- histogramBuckets.indices.init) {
println(s"${histogramBuckets(i)} - ${histogramBuckets(i+1)}: ${histogram(i+1)}")
}
println(s">= ${histogramBuckets.last}: ${histogram.last}")import scala.collection.mutable
class DistributedCache[K, V](loadFunction: K => V) extends Serializable {
@transient private lazy val cache = mutable.Map[K, V]()
def get(key: K): V = {
cache.getOrElseUpdate(key, loadFunction(key))
}
}
// Broadcast the cache instance
val databaseCache = new DistributedCache[String, DatabaseRecord] { key =>
// This will be called once per executor per key
loadFromDatabase(key)
}
val broadcastCache = sc.broadcast(databaseCache)
// Use across multiple operations
val enrichedData1 = rdd1.map { record =>
val enrichment = broadcastCache.value.get(record.key)
record.copy(metadata = enrichment)
}
val enrichedData2 = rdd2.map { record =>
val enrichment = broadcastCache.value.get(record.foreignKey)
record.copy(additionalInfo = enrichment)
}case class TaskMetrics(
processedRecords: Long = 0,
errorRecords: Long = 0,
processingTimeMs: Long = 0,
cacheHits: Long = 0,
cacheMisses: Long = 0
) {
def +(other: TaskMetrics): TaskMetrics = TaskMetrics(
processedRecords + other.processedRecords,
errorRecords + other.errorRecords,
processingTimeMs + other.processingTimeMs,
cacheHits + other.cacheHits,
cacheMisses + other.cacheMisses
)
}
class TaskMetricsAccumulator extends AccumulatorV2[TaskMetrics, TaskMetrics] {
private var _metrics = TaskMetrics()
def isZero: Boolean = _metrics == TaskMetrics()
def copy(): TaskMetricsAccumulator = {
val newAcc = new TaskMetricsAccumulator
newAcc._metrics = _metrics
newAcc
}
def reset(): Unit = _metrics = TaskMetrics()
def add(v: TaskMetrics): Unit = _metrics = _metrics + v
def merge(other: AccumulatorV2[TaskMetrics, TaskMetrics]): Unit = {
other match {
case o: TaskMetricsAccumulator => _metrics = _metrics + o._metrics
}
}
def value: TaskMetrics = _metrics
}
// Usage in a processing pipeline
val metricsAcc = new TaskMetricsAccumulator
sc.register(metricsAcc, "Pipeline Metrics")
val results = inputRDD.mapPartitions { partition =>
var partitionMetrics = TaskMetrics()
val cache = mutable.Map[String, Any]()
val processedPartition = partition.map { record =>
val startTime = System.currentTimeMillis()
try {
// Simulate cache lookup
val cacheKey = record.getCacheKey
val cachedValue = cache.get(cacheKey)
if (cachedValue.isDefined) {
partitionMetrics = partitionMetrics.copy(cacheHits = partitionMetrics.cacheHits + 1)
} else {
partitionMetrics = partitionMetrics.copy(cacheMisses = partitionMetrics.cacheMisses + 1)
cache(cacheKey) = computeValue(record)
}
val processed = processRecord(record)
val elapsed = System.currentTimeMillis() - startTime
partitionMetrics = partitionMetrics.copy(
processedRecords = partitionMetrics.processedRecords + 1,
processingTimeMs = partitionMetrics.processingTimeMs + elapsed
)
processed
} catch {
case _: Exception =>
partitionMetrics = partitionMetrics.copy(errorRecords = partitionMetrics.errorRecords + 1)
null
}
}.filter(_ != null)
// Add partition metrics to global accumulator
metricsAcc.add(partitionMetrics)
processedPartition
}
// Trigger computation and get metrics
val finalResults = results.collect()
val finalMetrics = metricsAcc.value
println(s"Processing Summary:")
println(s" Processed: ${finalMetrics.processedRecords}")
println(s" Errors: ${finalMetrics.errorRecords}")
println(s" Total time: ${finalMetrics.processingTimeMs}ms")
println(s" Avg time per record: ${finalMetrics.processingTimeMs.toDouble / finalMetrics.processedRecords}ms")
println(s" Cache hit rate: ${finalMetrics.cacheHits.toDouble / (finalMetrics.cacheHits + finalMetrics.cacheMisses) * 100}%")// Clean up resources properly
try {
val broadcastData = sc.broadcast(largeData)
val metrics = sc.longAccumulator("Processing Count")
// Use broadcast and accumulator
val results = processWithBroadcastAndAccumulator(inputRDD, broadcastData, metrics)
// Collect results
results.collect()
println(s"Processed ${metrics.value} records")
} finally {
// Always clean up
broadcastData.unpersist()
broadcastData.destroy()
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11