0
# Broadcast Variables and Accumulators
1
2
Distributed variable support for efficiently sharing read-only data across tasks (broadcast variables) and collecting information from executors (accumulators).
3
4
## Capabilities
5
6
### Broadcast Variables
7
8
Broadcast variables allow you to efficiently distribute read-only data to all worker nodes, rather than shipping a copy with each task.
9
10
```scala { .api }
11
/**
12
* A broadcast variable created by SparkContext.broadcast().
13
* Access its value through .value.
14
*
15
* @param id unique identifier for this broadcast variable
16
*/
17
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
18
19
/** Get the broadcast value */
20
def value: T
21
22
/** Asynchronously delete cached copies of this broadcast on the executors */
23
def unpersist(): Unit
24
25
/** Delete cached copies of this broadcast on the executors, with option to block */
26
def unpersist(blocking: Boolean): Unit
27
28
/** Destroy all data and metadata related to this broadcast variable */
29
def destroy(): Unit
30
31
/** Whether this broadcast variable is valid (not destroyed) */
32
def isValid: Boolean
33
34
override def toString: String = s"Broadcast($id)"
35
}
36
37
/**
38
* SparkContext methods for creating broadcast variables
39
*/
40
class SparkContext(config: SparkConf) extends Logging {
41
42
/** Broadcast a read-only variable to the cluster, returning a Broadcast object */
43
def broadcast[T: ClassTag](value: T): Broadcast[T]
44
}
45
```
46
47
**Usage Examples:**
48
49
```scala
50
import org.apache.spark.{SparkContext, SparkConf}
51
52
val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example").setMaster("local[*]"))
53
54
// Create lookup table
55
val lookupTable = Map(
56
"error" -> 1,
57
"warning" -> 2,
58
"info" -> 3,
59
"debug" -> 4
60
)
61
62
// Broadcast the lookup table
63
val broadcastLookup = sc.broadcast(lookupTable)
64
65
// Create RDD of log entries
66
val logEntries = sc.parallelize(Array(
67
"error: failed to connect",
68
"warning: retry attempt",
69
"info: processing started",
70
"debug: variable x = 5"
71
))
72
73
// Use broadcast variable in transformations
74
val categorizedLogs = logEntries.map { entry =>
75
val level = entry.split(":").head
76
val category = broadcastLookup.value.getOrElse(level, 0)
77
(entry, category)
78
}
79
80
val results = categorizedLogs.collect()
81
results.foreach(println)
82
83
// Cleanup broadcast variable
84
broadcastLookup.unpersist()
85
// broadcastLookup.destroy() // Use when completely done
86
87
sc.stop()
88
```
89
90
**Advanced Broadcast Usage:**
91
92
```scala
93
// Large lookup table scenario
94
val largeReferenceData = sc.textFile("reference-data.txt")
95
.map(line => {
96
val parts = line.split(",")
97
(parts(0), parts(1))
98
})
99
.collectAsMap() // Collect to driver as Map
100
101
val broadcastReference = sc.broadcast(largeReferenceData)
102
103
// Use in join-like operation (broadcast join)
104
val transactionData = sc.textFile("transactions.txt")
105
val enrichedTransactions = transactionData.map { transaction =>
106
val customerId = transaction.split(",").head
107
val customerInfo = broadcastReference.value.get(customerId)
108
(transaction, customerInfo)
109
}
110
111
// Machine learning model broadcast
112
val trainedModel = trainMLModel(trainingData) // Some ML model
113
val broadcastModel = sc.broadcast(trainedModel)
114
115
val predictions = testData.map { testPoint =>
116
val model = broadcastModel.value
117
val prediction = model.predict(testPoint)
118
(testPoint, prediction)
119
}
120
```
121
122
### Accumulators
123
124
Accumulators provide a simple syntax for aggregating values from worker nodes back to the driver program.
125
126
```scala { .api }
127
/**
128
* A data type that can be accumulated (has an associative and commutative operation)
129
*/
130
trait AccumulatorParam[T] extends Serializable {
131
/** Return a zero value for this accumulator type */
132
def zero(initialValue: T): T
133
134
/** Add two values together */
135
def addInPlace(t1: T, t2: T): T
136
}
137
138
/**
139
* A shared variable that can be accumulated across parallel operations
140
*/
141
class Accumulator[T] private[spark] (
142
@transient private[spark] val initialValue: T,
143
param: AccumulatorParam[T],
144
name: Option[String] = None) extends Serializable {
145
146
/** Get the current value of this accumulator from the driver program */
147
def value: T
148
149
/** Add a value to this accumulator */
150
def add(term: T): Unit
151
152
/** Add a value using += operator */
153
def += (term: T): Unit = add(term)
154
155
/** Get the current value from within a task (may not be the global value) */
156
def localValue: T
157
158
/** Get the zero value */
159
def zero: T
160
161
/** Get the accumulator's unique ID */
162
def id: Long
163
164
/** Get the accumulator's name */
165
def name: Option[String]
166
167
override def toString: String = if (name.isDefined) name.get else value.toString
168
}
169
170
/**
171
* SparkContext methods for creating accumulators
172
*/
173
class SparkContext(config: SparkConf) extends Logging {
174
175
/** Create an accumulator variable of type Int */
176
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
177
178
/** Create a named accumulator variable */
179
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]
180
181
/** Create an accumulator for integers */
182
def accumulator(initialValue: Int): Accumulator[Int]
183
184
/** Create an accumulator for doubles */
185
def accumulator(initialValue: Double): Accumulator[Double]
186
187
/** Create a named integer accumulator */
188
def accumulator(initialValue: Int, name: String): Accumulator[Int]
189
190
/** Create a named double accumulator */
191
def accumulator(initialValue: Double, name: String): Accumulator[Double]
192
}
193
```
194
195
**Usage Examples:**
196
197
```scala
198
val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example").setMaster("local[*]"))
199
200
// Create accumulators
201
val errorCount = sc.accumulator(0, "Error Count")
202
val warningCount = sc.accumulator(0, "Warning Count")
203
val totalProcessed = sc.accumulator(0)
204
205
// Process data with accumulators
206
val logData = sc.textFile("logs.txt")
207
208
val processedLogs = logData.map { line =>
209
totalProcessed += 1
210
211
if (line.contains("ERROR")) {
212
errorCount += 1
213
} else if (line.contains("WARNING")) {
214
warningCount += 1
215
}
216
217
line.toUpperCase
218
}
219
220
// Trigger action to update accumulators
221
processedLogs.count()
222
223
// Read accumulator values (only on driver)
224
println(s"Total processed: ${totalProcessed.value}")
225
println(s"Errors: ${errorCount.value}")
226
println(s"Warnings: ${warningCount.value}")
227
228
sc.stop()
229
```
230
231
**Advanced Accumulator Usage:**
232
233
```scala
234
// Custom accumulator for complex data types
235
case class Statistics(count: Long, sum: Double, sumSquares: Double) {
236
def mean: Double = if (count > 0) sum / count else 0.0
237
def variance: Double = if (count > 0) (sumSquares / count) - (mean * mean) else 0.0
238
}
239
240
implicit object StatisticsAccumulatorParam extends AccumulatorParam[Statistics] {
241
def zero(initialValue: Statistics): Statistics = Statistics(0, 0.0, 0.0)
242
243
def addInPlace(s1: Statistics, s2: Statistics): Statistics = {
244
Statistics(
245
s1.count + s2.count,
246
s1.sum + s2.sum,
247
s1.sumSquares + s2.sumSquares
248
)
249
}
250
}
251
252
// Use custom accumulator
253
val stats = sc.accumulator(Statistics(0, 0.0, 0.0), "Data Statistics")
254
255
val data = sc.parallelize(1.0 to 1000.0)
256
data.foreach { value =>
257
stats += Statistics(1, value, value * value)
258
}
259
260
println(s"Mean: ${stats.value.mean}")
261
println(s"Variance: ${stats.value.variance}")
262
263
// Collection accumulator (Spark 2.0+ style simulation)
264
import scala.collection.mutable
265
266
class ListAccumulator[T] extends AccumulatorParam[mutable.ListBuffer[T]] {
267
def zero(initialValue: mutable.ListBuffer[T]) = mutable.ListBuffer[T]()
268
269
def addInPlace(buf1: mutable.ListBuffer[T], buf2: mutable.ListBuffer[T]) = {
270
buf1 ++= buf2
271
}
272
}
273
274
implicit def listAccumulatorParam[T] = new ListAccumulator[T]
275
276
val errorMessages = sc.accumulator(mutable.ListBuffer[String](), "Error Messages")
277
278
logData.foreach { line =>
279
if (line.contains("FATAL")) {
280
errorMessages += mutable.ListBuffer(line)
281
}
282
}
283
284
println(s"Fatal errors: ${errorMessages.value.toList}")
285
```
286
287
### Built-in AccumulatorParam Implementations
288
289
Spark provides built-in accumulator parameters for common types:
290
291
```scala { .api }
292
// Built-in accumulator parameters
293
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
294
def zero(initialValue: Int) = 0
295
def addInPlace(t1: Int, t2: Int) = t1 + t2
296
}
297
298
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
299
def zero(initialValue: Long) = 0L
300
def addInPlace(t1: Long, t2: Long) = t1 + t2
301
}
302
303
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
304
def zero(initialValue: Double) = 0.0
305
def addInPlace(t1: Double, t2: Double) = t1 + t2
306
}
307
308
// For collections
309
implicit object ListAccumulatorParam extends AccumulatorParam[List[String]] {
310
def zero(initialValue: List[String]) = List()
311
def addInPlace(list1: List[String], list2: List[String]) = list1 ++ list2
312
}
313
```
314
315
## Java API Support
316
317
### Java Broadcast Variables
318
319
```java { .api }
320
/**
321
* Java-friendly broadcast variable access
322
*/
323
public class JavaSparkContext implements Closeable {
324
/** Broadcast a read-only variable to the cluster */
325
public <T> Broadcast<T> broadcast(T value)
326
}
327
328
// Usage in Java
329
import org.apache.spark.broadcast.Broadcast;
330
import java.util.Map;
331
import java.util.HashMap;
332
333
Map<String, Integer> lookupMap = new HashMap<>();
334
lookupMap.put("apple", 1);
335
lookupMap.put("banana", 2);
336
337
Broadcast<Map<String, Integer>> broadcastLookup = sc.broadcast(lookupMap);
338
339
JavaRDD<String> data = sc.parallelize(Arrays.asList("apple", "banana", "cherry"));
340
JavaRDD<Integer> mapped = data.map(item ->
341
broadcastLookup.value().getOrDefault(item, 0)
342
);
343
```
344
345
### Java Accumulators
346
347
```java { .api }
348
/**
349
* Java accumulator creation methods
350
*/
351
public class JavaSparkContext implements Closeable {
352
/** Create an integer accumulator */
353
public Accumulator<Integer> accumulator(Integer initialValue)
354
355
/** Create a named integer accumulator */
356
public Accumulator<Integer> accumulator(Integer initialValue, String name)
357
358
/** Create an accumulator with custom AccumulatorParam */
359
public <T> Accumulator<T> accumulator(T initialValue, AccumulatorParam<T> param)
360
361
/** Create a named accumulator with custom AccumulatorParam */
362
public <T> Accumulator<T> accumulator(T initialValue, String name, AccumulatorParam<T> param)
363
}
364
365
// Usage in Java
366
Accumulator<Integer> errorCount = sc.accumulator(0, "Errors");
367
Accumulator<Integer> lineCount = sc.accumulator(0);
368
369
JavaRDD<String> lines = sc.textFile("data.txt");
370
lines.foreach(line -> {
371
lineCount.add(1);
372
if (line.contains("ERROR")) {
373
errorCount.add(1);
374
}
375
});
376
377
System.out.println("Total lines: " + lineCount.value());
378
System.out.println("Error lines: " + errorCount.value());
379
```
380
381
## Performance and Best Practices
382
383
### Broadcast Variable Guidelines
384
385
**When to Use Broadcast Variables:**
386
- Lookup tables or reference data needed by many tasks
387
- Configuration objects used across transformations
388
- Machine learning models for scoring
389
- Data that would otherwise be serialized with every task
390
391
**Size Considerations:**
392
```scala
393
// Good: Small to medium lookup tables (< 1GB)
394
val smallLookup = Map(/* thousands of entries */)
395
val broadcast = sc.broadcast(smallLookup)
396
397
// Questionable: Very large data (> 1GB)
398
val hugeLookup = Map(/* millions of entries */)
399
val broadcast = sc.broadcast(hugeLookup) // May cause memory issues
400
```
401
402
**Lifecycle Management:**
403
```scala
404
// Create broadcast once, use many times
405
val lookup = sc.broadcast(referenceData)
406
407
// Use in multiple operations
408
val result1 = data1.map(x => lookup.value.get(x.id))
409
val result2 = data2.filter(x => lookup.value.contains(x.key))
410
411
// Cleanup when done
412
lookup.unpersist() // Remove from executors
413
lookup.destroy() // Complete cleanup (only when completely finished)
414
```
415
416
### Accumulator Guidelines
417
418
**Accumulator Guarantees:**
419
- Actions guarantee exactly-once accumulator updates
420
- Transformations may update accumulators multiple times due to retries
421
422
```scala
423
// Safe: Accumulator in action
424
data.foreach(x => counter += 1) // Each element counted exactly once
425
426
// Unsafe: Accumulator in transformation
427
val mapped = data.map { x =>
428
counter += 1 // May be called multiple times due to retries
429
transform(x)
430
}
431
mapped.collect() // Counter may be > data.count()
432
```
433
434
**Recommended Patterns:**
435
```scala
436
// Pattern 1: Use accumulators only in actions
437
data.foreach { x =>
438
if (isError(x)) errorCount += 1
439
process(x)
440
}
441
442
// Pattern 2: Separate transformation and accumulation
443
val processedData = data.map(transform) // No accumulator here
444
processedData.foreach { x =>
445
validCount += 1 // Accumulator in action
446
}
447
448
// Pattern 3: Use cache() if transformation with accumulator is reused
449
val transformed = data.map { x =>
450
operationCount += 1
451
expensiveTransform(x)
452
}.cache() // Cache to avoid recomputation
453
454
transformed.collect() // Accumulators updated once
455
```
456
457
### Memory Management
458
459
**Broadcast Variables:**
460
```scala
461
// Monitor broadcast memory usage
462
val conf = new SparkConf()
463
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // More efficient
464
.set("spark.broadcast.compress", "true") // Compress broadcasts
465
.set("spark.io.compression.codec", "snappy") // Fast compression
466
467
// Cleanup pattern
468
def withBroadcast[T, R](data: T)(operation: Broadcast[T] => R): R = {
469
val broadcast = sc.broadcast(data)
470
try {
471
operation(broadcast)
472
} finally {
473
broadcast.unpersist()
474
}
475
}
476
```
477
478
**Large Broadcast Variables:**
479
```scala
480
// For very large broadcast data, consider alternatives
481
// Option 1: Use external storage (Redis, etc.)
482
// Option 2: Partition the broadcast data
483
// Option 3: Use RDD join instead of broadcast
484
485
// Instead of huge broadcast:
486
val hugeBroadcast = sc.broadcast(hugeMap) // Memory issues
487
488
// Consider RDD join:
489
val lookupRDD = sc.parallelize(hugeMap.toSeq)
490
val joinedRDD = dataRDD.join(lookupRDD) // Distributed join
491
```
492
493
## Common Anti-patterns
494
495
### Broadcast Variable Misuse
496
```scala
497
// Bad: Broadcasting large mutable collections
498
val mutableMap = scala.collection.mutable.Map[String, Int]()
499
val broadcast = sc.broadcast(mutableMap) // Don't broadcast mutable data
500
501
// Bad: Broadcasting data used only once
502
val onceUsed = sc.broadcast(smallData)
503
data.map(x => onceUsed.value.get(x.id)).collect() // Just pass directly
504
505
// Good: Broadcasting immutable reference data used multiple times
506
val immutableLookup = Map[String, Int](/* data */)
507
val broadcast = sc.broadcast(immutableLookup)
508
```
509
510
### Accumulator Misuse
511
```scala
512
// Bad: Using accumulators for side effects in transformations
513
val result = data.map { x =>
514
counter += 1 // Bad: side effect in transformation
515
x * 2
516
}
517
518
// Bad: Reading accumulator value from tasks
519
data.map { x =>
520
val currentCount = counter.value // Bad: undefined behavior
521
x + currentCount
522
}
523
524
// Good: Using accumulators for monitoring in actions
525
data.map(transform).foreach { x =>
526
counter += 1 // Good: side effect in action
527
save(x)
528
}
529
```
530
531
### Resource Leaks
532
```scala
533
// Bad: Not cleaning up broadcast variables
534
def processData(): Unit = {
535
val broadcast = sc.broadcast(referenceData)
536
// ... use broadcast ...
537
// No cleanup - memory leak
538
}
539
540
// Good: Proper cleanup
541
def processData(): Unit = {
542
val broadcast = sc.broadcast(referenceData)
543
try {
544
// ... use broadcast ...
545
} finally {
546
broadcast.unpersist()
547
}
548
}
549
```