0
# Shared Variables
1
2
Broadcast variables and accumulators for efficient data sharing and distributed counting across cluster nodes in Spark applications.
3
4
## Capabilities
5
6
### Broadcast Variables
7
8
Read-only variables cached on each machine rather than shipping with tasks, providing efficient sharing of large datasets across all nodes.
9
10
```scala { .api }
11
/**
12
* A broadcast variable created with SparkContext.broadcast()
13
*/
14
abstract class Broadcast[T](val id: Long) extends Serializable {
15
/** Get the broadcasted value */
16
def value: T
17
18
/** Asynchronously delete cached copies of this broadcast on the executors */
19
def unpersist(): Unit
20
21
/** Asynchronously delete cached copies of this broadcast on the executors */
22
def unpersist(blocking: Boolean): Unit
23
24
/** Destroy all data and metadata related to this broadcast variable */
25
def destroy(): Unit
26
27
/** Whether this broadcast is valid */
28
def isValid: Boolean
29
30
override def toString: String = "Broadcast(" + id + ")"
31
}
32
33
// SparkContext methods for creating broadcast variables
34
def broadcast[T: ClassTag](value: T): Broadcast[T]
35
```
36
37
**Usage Examples:**
38
39
```scala
40
import org.apache.spark.{SparkContext, SparkConf}
41
42
val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example"))
43
44
// Create a large lookup table
45
val lookupTable = Map(
46
"US" -> "United States",
47
"UK" -> "United Kingdom",
48
"DE" -> "Germany",
49
"FR" -> "France"
50
// ... thousands more entries
51
)
52
53
// Broadcast the lookup table to all nodes
54
val broadcastLookup = sc.broadcast(lookupTable)
55
56
// Use broadcast variable in transformations
57
val countryData = sc.textFile("hdfs://country_codes.txt")
58
val countryNames = countryData.map { code =>
59
val lookup = broadcastLookup.value // Access broadcast value
60
lookup.getOrElse(code, "Unknown")
61
}
62
63
// Clean up when done
64
broadcastLookup.unpersist()
65
// broadcastLookup.destroy() // Only if completely done with variable
66
67
// Example with configuration
68
val config = Map(
69
"apiUrl" -> "https://api.example.com",
70
"timeout" -> "30000",
71
"retries" -> "3"
72
)
73
val broadcastConfig = sc.broadcast(config)
74
75
val processedData = inputRDD.mapPartitions { partition =>
76
val conf = broadcastConfig.value
77
// Use configuration for processing each partition
78
partition.map(processRecord(_, conf))
79
}
80
```
81
82
### Accumulators
83
84
Shared variables that can be "added" to through associative operations, providing efficient distributed counters and collectors.
85
86
```scala { .api }
87
/**
88
* A shared variable that can be accumulated (i.e., has an associative and commutative "add" operation)
89
*/
90
class Accumulator[T](initialValue: T, param: AccumulatorParam[T], name: Option[String] = None) extends Serializable {
91
/** Get the current value of this accumulator from within a task */
92
def value: T
93
94
/** Set the accumulator's value; only the driver can call this */
95
def setValue(newValue: T): Unit
96
97
/** Add a value to this accumulator */
98
def add(term: T): Unit
99
100
/** The += operator; can be used to add to the accumulator */
101
def +=(term: T): Unit = add(term)
102
103
/** Merge two accumulators together */
104
def ++(other: Accumulator[T]): Accumulator[T]
105
106
/** Access the accumulator's current value; only the driver should call this */
107
def localValue: T = value
108
109
override def toString: String = if (name.isDefined) name.get else localValue.toString
110
}
111
112
/**
113
* A more general version of Accumulator where the result type differs from the element type
114
*/
115
class Accumulable[R, T](initialValue: R, param: AccumulableParam[R, T], name: Option[String] = None) extends Serializable {
116
/** Get the current value */
117
def value: R
118
119
/** Set the value; only the driver can call this */
120
def setValue(newValue: R): Unit
121
122
/** Add a term to this accumulable */
123
def add(term: T): Unit
124
125
/** The += operator */
126
def +=(term: T): Unit = add(term)
127
128
/** Add to the accumulator (alternative to +=) */
129
def ++=(term: T): Unit = add(term)
130
131
/** Merge with another Accumulable */
132
def ++(other: Accumulable[R, T]): Accumulable[R, T]
133
134
/** Access the current value; only the driver should call this */
135
def localValue: R = value
136
137
override def toString: String = if (name.isDefined) name.get else localValue.toString
138
}
139
140
// SparkContext methods for creating accumulators
141
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
142
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]
143
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]
144
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]
145
```
146
147
**Usage Examples:**
148
149
```scala
150
import org.apache.spark.{SparkContext, SparkConf}
151
152
val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))
153
154
// Basic numeric accumulator
155
val errorCount = sc.accumulator(0, "Error Count")
156
val sumAccum = sc.accumulator(0.0, "Sum Accumulator")
157
158
val data = sc.parallelize(1 to 1000)
159
160
// Use accumulators in transformations
161
val processedData = data.map { value =>
162
try {
163
if (value % 100 == 0) {
164
throw new RuntimeException(s"Error processing $value")
165
}
166
sumAccum += value.toDouble
167
value * 2
168
} catch {
169
case e: Exception =>
170
errorCount += 1
171
-1 // Error marker
172
}
173
}
174
175
// Trigger action to execute transformations
176
val results = processedData.filter(_ != -1).collect()
177
178
// Read accumulator values (only on driver)
179
println(s"Processed ${results.length} items")
180
println(s"Encountered ${errorCount.value} errors")
181
println(s"Sum of successful values: ${sumAccum.value}")
182
183
// Collection accumulator example
184
val uniqueWords = sc.accumulable(Set.empty[String])
185
186
val text = sc.textFile("hdfs://input.txt")
187
text.flatMap(_.split(" ")).foreach { word =>
188
uniqueWords += word
189
}
190
191
println(s"Unique words found: ${uniqueWords.value.size}")
192
```
193
194
### AccumulatorParam and AccumulableParam
195
196
Interfaces defining how accumulator values are combined.
197
198
```scala { .api }
199
/**
200
* A trait that defines how to accumulate values of type T
201
*/
202
trait AccumulatorParam[T] extends Serializable {
203
/** Add two values together and return a new value */
204
def addInPlace(r1: T, r2: T): T
205
206
/** Return the "zero" value for this type */
207
def zero(initialValue: T): T
208
}
209
210
/**
211
* A trait that defines how to accumulate values of type T into type R
212
*/
213
trait AccumulableParam[R, T] extends Serializable {
214
/** Add a T value to an R accumulator and return a new R */
215
def addAccumulator(r: R, t: T): R
216
217
/** Add two R values together and return a new R */
218
def addInPlace(r1: R, r2: R): R
219
220
/** Return the "zero" value for type R */
221
def zero(initialValue: R): R
222
}
223
224
// Built-in accumulator parameters
225
object AccumulatorParam {
226
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
227
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
228
def zero(initialValue: Int): Int = 0
229
}
230
231
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
232
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
233
def zero(initialValue: Long): Long = 0L
234
}
235
236
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
237
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
238
def zero(initialValue: Double): Double = 0.0
239
}
240
241
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
242
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
243
def zero(initialValue: Float): Float = 0.0f
244
}
245
}
246
```
247
248
**Custom Accumulator Examples:**
249
250
```scala
251
// Custom accumulator for collecting statistics
252
case class Stats(count: Long, sum: Double, min: Double, max: Double)
253
254
implicit object StatsAccumulatorParam extends AccumulatorParam[Stats] {
255
def zero(initialValue: Stats): Stats = Stats(0, 0.0, Double.MaxValue, Double.MinValue)
256
257
def addInPlace(s1: Stats, s2: Stats): Stats = {
258
if (s1.count == 0) s2
259
else if (s2.count == 0) s1
260
else Stats(
261
s1.count + s2.count,
262
s1.sum + s2.sum,
263
math.min(s1.min, s2.min),
264
math.max(s1.max, s2.max)
265
)
266
}
267
}
268
269
// Custom accumulable for collecting unique items
270
implicit object SetAccumulableParam extends AccumulableParam[Set[String], String] {
271
def zero(initialValue: Set[String]): Set[String] = Set.empty
272
def addAccumulator(set: Set[String], item: String): Set[String] = set + item
273
def addInPlace(set1: Set[String], set2: Set[String]): Set[String] = set1 ++ set2
274
}
275
276
// Usage
277
val stats = sc.accumulator(Stats(0, 0.0, Double.MaxValue, Double.MinValue))
278
val uniqueItems = sc.accumulable(Set.empty[String])
279
280
val data = sc.parallelize(Array(1.5, 2.7, 3.1, 4.9, 2.7))
281
data.foreach { value =>
282
stats += Stats(1, value, value, value)
283
uniqueItems += value.toString
284
}
285
286
val finalStats = stats.value
287
println(s"Count: ${finalStats.count}, Avg: ${finalStats.sum / finalStats.count}")
288
println(s"Unique values: ${uniqueItems.value}")
289
```
290
291
## Advanced Patterns
292
293
### Combining Broadcast Variables and Accumulators
294
295
```scala
296
// Configuration and monitoring pattern
297
val config = Map("threshold" -> 100, "maxRetries" -> 3)
298
val broadcastConfig = sc.broadcast(config)
299
300
val successCount = sc.accumulator(0, "Successful Operations")
301
val failureCount = sc.accumulator(0, "Failed Operations")
302
val retryCount = sc.accumulator(0, "Retry Count")
303
304
val results = inputData.mapPartitions { partition =>
305
val conf = broadcastConfig.value
306
val threshold = conf("threshold")
307
val maxRetries = conf("maxRetries")
308
309
partition.map { record =>
310
var attempts = 0
311
var success = false
312
var result: Option[String] = None
313
314
while (attempts < maxRetries && !success) {
315
try {
316
if (record.value > threshold) {
317
result = Some(processRecord(record))
318
success = true
319
successCount += 1
320
} else {
321
failureCount += 1
322
success = true // Don't retry for threshold failures
323
}
324
} catch {
325
case _: Exception =>
326
attempts += 1
327
retryCount += 1
328
if (attempts >= maxRetries) {
329
failureCount += 1
330
}
331
}
332
}
333
334
result
335
}.filter(_.isDefined).map(_.get)
336
}
337
338
// Trigger execution and report metrics
339
val finalResults = results.collect()
340
println(s"Processed: ${finalResults.length}")
341
println(s"Successful: ${successCount.value}")
342
println(s"Failed: ${failureCount.value}")
343
println(s"Retries: ${retryCount.value}")
344
```
345
346
### Performance Monitoring with Accumulators
347
348
```scala
349
// Performance monitoring accumulators
350
val processingTimeAccum = sc.accumulator(0L, "Total Processing Time")
351
val recordsProcessedAccum = sc.accumulator(0L, "Records Processed")
352
val partitionStatsAccum = sc.accumulable(Map.empty[Int, (Long, Long)])
353
354
val monitoredData = inputRDD.mapPartitionsWithIndex { (partitionId, partition) =>
355
val startTime = System.currentTimeMillis()
356
var recordCount = 0L
357
358
val processedPartition = partition.map { record =>
359
recordCount += 1
360
recordsProcessedAccum += 1
361
// Process record
362
processRecord(record)
363
}.toList
364
365
val endTime = System.currentTimeMillis()
366
val processingTime = endTime - startTime
367
processingTimeAccum += processingTime
368
partitionStatsAccum += Map(partitionId -> (recordCount, processingTime))
369
370
processedPartition.iterator
371
}
372
373
// Trigger execution
374
val results = monitoredData.collect()
375
376
// Analyze performance
377
val totalTime = processingTimeAccum.value
378
val totalRecords = recordsProcessedAccum.value
379
val avgTimePerRecord = totalTime.toDouble / totalRecords
380
381
println(s"Total processing time: ${totalTime}ms")
382
println(s"Average time per record: ${avgTimePerRecord}ms")
383
println(s"Partition stats: ${partitionStatsAccum.value}")
384
```
385
386
### Error Collection with Accumulators
387
388
```scala
389
case class ProcessingError(partitionId: Int, recordId: String, error: String, timestamp: Long)
390
391
// Custom accumulator for collecting errors
392
implicit object ErrorAccumulableParam extends AccumulableParam[List[ProcessingError], ProcessingError] {
393
def zero(initialValue: List[ProcessingError]): List[ProcessingError] = List.empty
394
def addAccumulator(list: List[ProcessingError], error: ProcessingError): List[ProcessingError] = error :: list
395
def addInPlace(list1: List[ProcessingError], list2: List[ProcessingError]): List[ProcessingError] = list1 ++ list2
396
}
397
398
val errorCollector = sc.accumulable(List.empty[ProcessingError])
399
400
val processedData = inputRDD.mapPartitionsWithIndex { (partitionId, partition) =>
401
partition.map { record =>
402
try {
403
processRecord(record)
404
} catch {
405
case e: Exception =>
406
val error = ProcessingError(
407
partitionId = partitionId,
408
recordId = record.id,
409
error = e.getMessage,
410
timestamp = System.currentTimeMillis()
411
)
412
errorCollector += error
413
None
414
}
415
}.filter(_.isDefined).map(_.get)
416
}
417
418
// Execute and collect errors
419
val results = processedData.collect()
420
val errors = errorCollector.value
421
422
println(s"Successfully processed: ${results.length}")
423
println(s"Errors encountered: ${errors.length}")
424
errors.foreach(error => println(s"Error in partition ${error.partitionId}: ${error.error}"))
425
```
426
427
## Best Practices
428
429
### Broadcast Variable Optimization
430
431
```scala
432
// 1. Broadcast large read-only data structures
433
val largeLookupTable = loadLookupTable() // Assume this is large
434
val broadcastTable = sc.broadcast(largeLookupTable)
435
436
// 2. Reuse broadcast variables across multiple operations
437
val enrichedData1 = data1.map(enrichWithLookup(_, broadcastTable.value))
438
val enrichedData2 = data2.map(enrichWithLookup(_, broadcastTable.value))
439
440
// 3. Clean up when done
441
broadcastTable.unpersist() // Remove from memory
442
broadcastTable.destroy() // Complete cleanup
443
```
444
445
### Accumulator Best Practices
446
447
```scala
448
// 1. Only update accumulators inside actions or transformations
449
val counter = sc.accumulator(0)
450
451
// CORRECT: Update in transformation that leads to action
452
val results = data.map { value =>
453
if (someCondition(value)) counter += 1
454
processValue(value)
455
}.collect() // Action triggers execution
456
457
// 2. Be aware of lazy evaluation
458
val lazyRDD = data.map { value =>
459
counter += 1 // This will be called multiple times if RDD is reused
460
value * 2
461
}
462
463
lazyRDD.cache() // Cache to avoid recomputation
464
val result1 = lazyRDD.count()
465
val result2 = lazyRDD.sum() // Counter won't be incremented again
466
467
// 3. Use meaningful names for debugging
468
val errorCounter = sc.accumulator(0, "Processing Errors")
469
val warningCounter = sc.accumulator(0, "Processing Warnings")
470
```
471
472
### Memory Management
473
474
```scala
475
// Monitor memory usage with accumulators
476
val memoryUsageAccum = sc.accumulator(0L, "Memory Usage")
477
478
val processedData = largeDataRDD.mapPartitions { partition =>
479
val runtime = Runtime.getRuntime
480
val initialMemory = runtime.totalMemory() - runtime.freeMemory()
481
482
val results = partition.map(processLargeRecord).toList
483
484
val finalMemory = runtime.totalMemory() - runtime.freeMemory()
485
memoryUsageAccum += (finalMemory - initialMemory)
486
487
results.iterator
488
}
489
490
processedData.count() // Trigger execution
491
println(s"Total memory used: ${memoryUsageAccum.value / (1024 * 1024)} MB")
492
```