0
# Broadcasting and Accumulators
1
2
Shared variables in Spark provide efficient mechanisms for distributing read-only data (broadcast variables) and aggregating values across distributed computations (accumulators) without expensive network operations.
3
4
## Broadcast Variables
5
6
Broadcast variables allow keeping a read-only variable cached on each machine rather than shipping a copy with every task.
7
8
### Broadcast Interface
9
10
```scala { .api }
11
abstract class Broadcast[T] {
12
def id: Long
13
def value: T
14
def unpersist(): Unit
15
def unpersist(blocking: Boolean): Unit
16
def destroy(): Unit
17
def destroy(blocking: Boolean): Unit
18
def toString: String
19
}
20
```
21
22
### Creating Broadcast Variables
23
24
```scala { .api }
25
class SparkContext {
26
def broadcast[T: ClassTag](value: T): Broadcast[T]
27
}
28
29
class JavaSparkContext {
30
def broadcast[T](value: T): Broadcast[T]
31
}
32
```
33
34
## Accumulators
35
36
Accumulators provide a mechanism for aggregating information across tasks in a fault-tolerant way.
37
38
### AccumulatorV2 Base Interface
39
40
```scala { .api }
41
abstract class AccumulatorV2[IN, OUT] {
42
def isZero: Boolean
43
def copy(): AccumulatorV2[IN, OUT]
44
def reset(): Unit
45
def add(v: IN): Unit
46
def merge(other: AccumulatorV2[IN, OUT]): Unit
47
def value: OUT
48
}
49
```
50
51
### Built-in Accumulator Types
52
53
#### LongAccumulator
54
55
```scala { .api }
56
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
57
def add(v: Long): Unit
58
def add(v: java.lang.Long): Unit
59
def count: Long
60
def sum: Long
61
def avg: Double
62
def value: java.lang.Long
63
def isZero: Boolean
64
}
65
```
66
67
#### DoubleAccumulator
68
69
```scala { .api }
70
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
71
def add(v: Double): Unit
72
def add(v: java.lang.Double): Unit
73
def count: Long
74
def sum: Double
75
def avg: Double
76
def value: java.lang.Double
77
def isZero: Boolean
78
}
79
```
80
81
#### CollectionAccumulator
82
83
```scala { .api }
84
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
85
def add(v: T): Unit
86
def merge(other: CollectionAccumulator[T]): Unit
87
def value: java.util.List[T]
88
def isZero: Boolean
89
}
90
```
91
92
### Creating Accumulators
93
94
#### Scala API
95
96
```scala { .api }
97
class SparkContext {
98
def longAccumulator: LongAccumulator
99
def longAccumulator(name: String): LongAccumulator
100
def doubleAccumulator: DoubleAccumulator
101
def doubleAccumulator(name: String): DoubleAccumulator
102
def collectionAccumulator[T]: CollectionAccumulator[T]
103
def register(acc: AccumulatorV2[_, _]): Unit
104
def register(acc: AccumulatorV2[_, _], name: String): Unit
105
}
106
```
107
108
#### Java API
109
110
```java { .api }
111
public class JavaSparkContext {
112
public LongAccumulator longAccumulator()
113
public LongAccumulator longAccumulator(String name)
114
public DoubleAccumulator doubleAccumulator()
115
public DoubleAccumulator doubleAccumulator(String name)
116
public <T> CollectionAccumulator<T> collectionAccumulator()
117
}
118
```
119
120
## Usage Examples
121
122
### Basic Broadcast Variable Usage
123
124
```scala
125
import org.apache.spark.{SparkContext, SparkConf}
126
127
val conf = new SparkConf().setAppName("Broadcast Example")
128
val sc = new SparkContext(conf)
129
130
// Create a lookup table
131
val lookupTable = Map(
132
"apple" -> 0.5,
133
"banana" -> 0.3,
134
"cherry" -> 1.0,
135
"date" -> 0.8
136
)
137
138
// Broadcast the lookup table
139
val broadcastTable = sc.broadcast(lookupTable)
140
141
// Use in transformations
142
val products = sc.parallelize(Seq("apple", "banana", "cherry", "unknown"))
143
val prices = products.map { product =>
144
val table = broadcastTable.value // Access broadcast value
145
val price = table.getOrElse(product, 0.0)
146
(product, price)
147
}
148
149
val result = prices.collect()
150
// Result: Array((apple,0.5), (banana,0.3), (cherry,1.0), (unknown,0.0))
151
152
// Clean up broadcast variable
153
broadcastTable.unpersist()
154
```
155
156
### Accumulator Examples
157
158
#### Simple Counter
159
160
```scala
161
val sc = new SparkContext(conf)
162
163
// Create accumulator
164
val counter = sc.longAccumulator("My Counter")
165
166
val data = sc.parallelize(1 to 100)
167
val processed = data.map { x =>
168
if (x % 10 == 0) {
169
counter.add(1) // Count multiples of 10
170
}
171
x * 2
172
}
173
174
// Trigger action to execute transformations
175
processed.collect()
176
177
println(s"Multiples of 10 found: ${counter.value}")
178
// Output: Multiples of 10 found: 10
179
```
180
181
#### Error Counting
182
183
```scala
184
val errorCounter = sc.longAccumulator("Errors")
185
val successCounter = sc.longAccumulator("Successes")
186
187
val logs = sc.textFile("application.log")
188
val processed = logs.map { line =>
189
try {
190
val result = processLogLine(line)
191
successCounter.add(1)
192
result
193
} catch {
194
case _: Exception =>
195
errorCounter.add(1)
196
"ERROR"
197
}
198
}
199
200
processed.collect()
201
202
println(s"Successful: ${successCounter.value}, Errors: ${errorCounter.value}")
203
```
204
205
#### Collection Accumulator
206
207
```scala
208
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
209
210
val data = sc.parallelize(1 to 100)
211
val results = data.map { x =>
212
if (x % 13 == 0) {
213
errorMessages.add(s"Unlucky number: $x")
214
}
215
x
216
}
217
218
results.collect()
219
220
// Get all collected error messages
221
val errors = errorMessages.value
222
println(s"Collected ${errors.size()} error messages:")
223
errors.forEach(println)
224
```
225
226
### Advanced Broadcast Usage
227
228
#### Large Lookup Tables
229
230
```scala
231
// Simulate a large lookup table (could be loaded from database)
232
val largeLookup = (1 to 100000).map(i => (s"key$i", s"value$i")).toMap
233
val broadcastLookup = sc.broadcast(largeLookup)
234
235
// Process large dataset
236
val keys = sc.parallelize(1 to 1000000).map(i => s"key${i % 100000}")
237
val enriched = keys.mapPartitions { partition =>
238
val lookup = broadcastLookup.value // Access once per partition
239
partition.map(key => (key, lookup.getOrElse(key, "unknown")))
240
}
241
242
val sample = enriched.take(10)
243
```
244
245
#### Machine Learning Features
246
247
```scala
248
// Broadcast feature weights for prediction
249
case class ModelWeights(weights: Array[Double], intercept: Double)
250
251
val trainedWeights = ModelWeights(
252
weights = Array(0.5, -0.3, 0.8, 1.2),
253
intercept = 0.1
254
)
255
256
val broadcastWeights = sc.broadcast(trainedWeights)
257
258
// Apply model to features
259
val features = sc.parallelize(Seq(
260
Array(1.0, 2.0, 3.0, 4.0),
261
Array(0.5, 1.5, 2.5, 3.5),
262
Array(2.0, 1.0, 4.0, 2.0)
263
))
264
265
val predictions = features.map { featureVector =>
266
val model = broadcastWeights.value
267
val score = featureVector.zip(model.weights)
268
.map { case (feature, weight) => feature * weight }
269
.sum + model.intercept
270
271
if (score > 0) 1.0 else 0.0
272
}
273
274
predictions.collect()
275
```
276
277
### Custom Accumulator
278
279
```scala
280
import org.apache.spark.util.AccumulatorV2
281
import scala.collection.mutable
282
283
// Custom accumulator for collecting unique values
284
class SetAccumulator[T] extends AccumulatorV2[T, mutable.Set[T]] {
285
private val _set = mutable.Set.empty[T]
286
287
def isZero: Boolean = _set.isEmpty
288
289
def copy(): SetAccumulator[T] = {
290
val newAcc = new SetAccumulator[T]
291
newAcc._set ++= this._set
292
newAcc
293
}
294
295
def reset(): Unit = _set.clear()
296
297
def add(v: T): Unit = _set += v
298
299
def merge(other: AccumulatorV2[T, mutable.Set[T]]): Unit = other match {
300
case o: SetAccumulator[T] => _set ++= o._set
301
case _ => throw new UnsupportedOperationException("Cannot merge different accumulator types")
302
}
303
304
def value: mutable.Set[T] = _set.clone()
305
}
306
307
// Usage
308
val uniqueWords = new SetAccumulator[String]
309
sc.register(uniqueWords, "Unique Words")
310
311
val text = sc.textFile("document.txt")
312
val words = text.flatMap(_.split("\\s+")).map(_.toLowerCase)
313
314
words.foreach(word => uniqueWords.add(word))
315
316
println(s"Found ${uniqueWords.value.size} unique words")
317
uniqueWords.value.take(10).foreach(println)
318
```
319
320
### Java API Examples
321
322
#### Java Broadcast Variables
323
324
```java
325
import org.apache.spark.SparkConf;
326
import org.apache.spark.api.java.JavaSparkContext;
327
import org.apache.spark.api.java.JavaRDD;
328
import org.apache.spark.broadcast.Broadcast;
329
import java.util.*;
330
331
public class JavaBroadcastExample {
332
public static void main(String[] args) {
333
SparkConf conf = new SparkConf().setAppName("Java Broadcast");
334
JavaSparkContext sc = new JavaSparkContext(conf);
335
336
// Create lookup map
337
Map<String, Double> priceMap = new HashMap<>();
338
priceMap.put("apple", 0.5);
339
priceMap.put("banana", 0.3);
340
priceMap.put("cherry", 1.0);
341
342
// Broadcast the map
343
Broadcast<Map<String, Double>> broadcastPrices = sc.broadcast(priceMap);
344
345
// Use in transformation
346
List<String> products = Arrays.asList("apple", "banana", "unknown");
347
JavaRDD<String> productRDD = sc.parallelize(products);
348
349
JavaRDD<String> pricesRDD = productRDD.map(product -> {
350
Map<String, Double> prices = broadcastPrices.value();
351
Double price = prices.getOrDefault(product, 0.0);
352
return product + ": $" + price;
353
});
354
355
List<String> results = pricesRDD.collect();
356
results.forEach(System.out::println);
357
358
sc.close();
359
}
360
}
361
```
362
363
#### Java Accumulators
364
365
```java
366
import org.apache.spark.util.LongAccumulator;
367
import org.apache.spark.util.CollectionAccumulator;
368
369
// Long accumulator
370
LongAccumulator errorCount = sc.longAccumulator("Error Count");
371
LongAccumulator processedCount = sc.longAccumulator("Processed Count");
372
373
JavaRDD<String> logs = sc.textFile("logs.txt");
374
JavaRDD<String> processed = logs.map(line -> {
375
try {
376
// Simulate processing
377
if (line.contains("ERROR")) {
378
errorCount.add(1);
379
return "ERROR: " + line;
380
} else {
381
processedCount.add(1);
382
return "OK: " + line;
383
}
384
} catch (Exception e) {
385
errorCount.add(1);
386
return "EXCEPTION: " + line;
387
}
388
});
389
390
processed.collect(); // Trigger computation
391
392
System.out.println("Errors: " + errorCount.value());
393
System.out.println("Processed: " + processedCount.value());
394
395
// Collection accumulator
396
CollectionAccumulator<String> errorMessages = sc.collectionAccumulator();
397
398
JavaRDD<Integer> numbers = sc.parallelize(Arrays.asList(1, 2, 3, 0, 5));
399
numbers.foreach(num -> {
400
if (num == 0) {
401
errorMessages.add("Found zero at position");
402
}
403
});
404
405
List<String> errors = errorMessages.value();
406
errors.forEach(System.out::println);
407
```
408
409
## Performance Considerations
410
411
### Broadcast Variables
412
413
#### When to Use Broadcast Variables
414
415
- **Large lookup tables** that fit in memory
416
- **Configuration data** needed across all tasks
417
- **Model parameters** for machine learning
418
- **Reference data** that doesn't change during job execution
419
420
#### Best Practices
421
422
- **Size limitations**: Keep broadcast variables under a few hundred MB
423
- **Serialization**: Use efficient serialization formats (Kryo)
424
- **Memory management**: Unpersist when no longer needed
425
- **Network efficiency**: Broadcast once, use many times
426
427
```scala
428
// Good: Broadcast large lookup table once
429
val largeLookup = loadLargeLookupTable() // 100MB lookup table
430
val broadcastLookup = sc.broadcast(largeLookup)
431
432
val results = bigDataset.mapPartitions { partition =>
433
val lookup = broadcastLookup.value // Access once per partition
434
partition.map(record => enrichWithLookup(record, lookup))
435
}
436
437
// Bad: Closure captures large variable, shipped with every task
438
val largeLookup = loadLargeLookupTable() // This gets serialized with every task!
439
val results = bigDataset.map(record => enrichWithLookup(record, largeLookup))
440
```
441
442
### Accumulators
443
444
#### When to Use Accumulators
445
446
- **Counting events** across distributed computation
447
- **Collecting diagnostics** and metrics
448
- **Debugging** distributed applications
449
- **Simple aggregations** that don't require grouping
450
451
#### Important Limitations
452
453
- **Actions only**: Accumulators are only updated when inside actions
454
- **Fault tolerance**: Updates may be applied multiple times if tasks are retried
455
- **Side effects**: Don't rely on accumulators for program logic
456
457
```scala
458
// Good: Use accumulator for counting
459
val errorCounter = sc.longAccumulator("Errors")
460
val data = sc.parallelize(records)
461
val cleaned = data.map { record =>
462
try {
463
cleanRecord(record)
464
} catch {
465
case _: Exception =>
466
errorCounter.add(1) // Safe to count errors
467
defaultRecord
468
}
469
}
470
cleaned.count() // Accumulator updated during action
471
472
// Bad: Rely on accumulator for logic
473
val itemCounter = sc.longAccumulator("Items")
474
val data = sc.parallelize(items)
475
val processed = data.map { item =>
476
itemCounter.add(1)
477
if (itemCounter.value > 1000) { // DON'T DO THIS - unreliable!
478
processSpecially(item)
479
} else {
480
processNormally(item)
481
}
482
}
483
```
484
485
## Configuration
486
487
### Broadcast Variables
488
489
- `spark.broadcast.blockSize` - Size of each block for broadcast variables (default: 4m)
490
- `spark.broadcast.compress` - Compress broadcast variables (default: true)
491
- `spark.serializer` - Serializer for broadcast variables (KryoSerializer recommended)
492
493
### Memory Management
494
495
- `spark.memory.fraction` - Fraction of heap for execution and storage
496
- `spark.memory.storageFraction` - Fraction of storage memory for caching
497
498
## Error Handling and Debugging
499
500
### Broadcast Variable Issues
501
502
```scala
503
// Handle missing broadcast data gracefully
504
val maybeBroadcast: Option[Broadcast[Map[String, String]]] =
505
if (lookupData.nonEmpty) Some(sc.broadcast(lookupData)) else None
506
507
val processed = data.map { record =>
508
maybeBroadcast match {
509
case Some(broadcast) => enrichRecord(record, broadcast.value)
510
case None => record // No enrichment if no broadcast data
511
}
512
}
513
```
514
515
### Accumulator Debugging
516
517
```scala
518
// Use accumulators to track different types of records
519
val validRecords = sc.longAccumulator("Valid Records")
520
val invalidRecords = sc.longAccumulator("Invalid Records")
521
val emptyRecords = sc.longAccumulator("Empty Records")
522
523
val processed = data.map { record =>
524
record match {
525
case r if r.isEmpty =>
526
emptyRecords.add(1)
527
None
528
case r if isValid(r) =>
529
validRecords.add(1)
530
Some(processRecord(r))
531
case _ =>
532
invalidRecords.add(1)
533
None
534
}
535
}.filter(_.isDefined).map(_.get)
536
537
processed.count() // Trigger computation
538
539
// Check processing results
540
println(s"Valid: ${validRecords.value}")
541
println(s"Invalid: ${invalidRecords.value}")
542
println(s"Empty: ${emptyRecords.value}")
543
```
544
545
## Important Notes
546
547
### Broadcast Variables
548
549
- **Immutable**: Broadcast variables cannot be modified after creation
550
- **Lazy**: Variables are sent to executors when first used
551
- **Cached**: Each executor caches the broadcast variable locally
552
- **Cleanup**: Call `unpersist()` or `destroy()` to free memory
553
- **Serialization**: Variables must be serializable
554
555
### Accumulators
556
557
- **Write-only**: Tasks can only add to accumulators, not read their values
558
- **Driver access**: Only the driver can read accumulator values
559
- **Fault tolerance**: Values may be updated multiple times due to task retries
560
- **Actions only**: Reliable updates only occur within actions, not transformations
561
- **Registration**: Named accumulators appear in Spark UI for monitoring
562
563
### General Guidelines
564
565
- **Use broadcast for large read-only data** shared across tasks
566
- **Use accumulators for simple counting and collecting**
567
- **Monitor memory usage** for broadcast variables
568
- **Don't use accumulators for critical program logic**
569
- **Clean up resources** when no longer needed
570
- **Test with different cluster sizes** to ensure proper behavior