0
# Broadcast Variables and Accumulators
1
2
Shared variables in Spark enable efficient data sharing across cluster nodes. Broadcast variables distribute read-only data efficiently, while accumulators collect information from workers back to the driver in a fault-tolerant manner.
3
4
## Capabilities
5
6
### Broadcast Variables
7
8
Efficiently distribute read-only data to all cluster nodes, avoiding repeated serialization and network transfer.
9
10
```scala { .api }
11
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
12
def value: T
13
def unpersist(): Unit
14
def unpersist(blocking: Boolean): Unit
15
def destroy(): Unit
16
def toString: String
17
}
18
19
// Creation via SparkContext
20
def broadcast[T: ClassTag](value: T): Broadcast[T]
21
```
22
23
**Usage Examples:**
24
```scala
25
// Large lookup table that all tasks need
26
val lookupTable = Map(
27
"user1" -> "Alice",
28
"user2" -> "Bob",
29
"user3" -> "Charlie"
30
// ... potentially thousands of entries
31
)
32
33
// Broadcast the table once instead of sending with each task
34
val broadcastLookup = sc.broadcast(lookupTable)
35
36
// Use in transformations - only the broadcasted reference is serialized
37
val userIds = sc.parallelize(Seq("user1", "user2", "user3", "user1"))
38
val userNames = userIds.map { id =>
39
broadcastLookup.value.getOrElse(id, "Unknown")
40
}
41
// Result: ["Alice", "Bob", "Charlie", "Alice"]
42
43
// Clean up when done
44
broadcastLookup.unpersist()
45
```
46
47
**Common Use Cases:**
48
```scala
49
// Configuration broadcast
50
val config = Map("apiUrl" -> "https://api.example.com", "timeout" -> "30s")
51
val broadcastConfig = sc.broadcast(config)
52
53
// Model parameters broadcast
54
val modelWeights = Array.fill(1000)(scala.util.Random.nextDouble())
55
val broadcastWeights = sc.broadcast(modelWeights)
56
57
// Large dimension tables
58
val productCatalog = loadProductCatalogFromDatabase() // Large dataset
59
val broadcastCatalog = sc.broadcast(productCatalog)
60
61
val transactions = sc.textFile("transactions.txt")
62
val enrichedTransactions = transactions.map { transaction =>
63
val productId = extractProductId(transaction)
64
val productInfo = broadcastCatalog.value.get(productId)
65
enrichTransaction(transaction, productInfo)
66
}
67
```
68
69
### AccumulatorV2 Base Class
70
71
Base class for all accumulator implementations, providing fault-tolerant aggregation.
72
73
```scala { .api }
74
abstract class AccumulatorV2[IN, OUT] extends Serializable {
75
// Core accumulator interface
76
def isZero: Boolean
77
def copy(): AccumulatorV2[IN, OUT]
78
def reset(): Unit
79
def add(v: IN): Unit
80
def merge(other: AccumulatorV2[IN, OUT]): Unit
81
def value: OUT
82
83
// Metadata
84
def name: Option[String]
85
def id: Long
86
def isRegistered: Boolean
87
def register(sc: SparkContext, name: Option[String] = None, countFailedValues: Boolean = false): Unit
88
}
89
```
90
91
### Built-in Accumulator Types
92
93
Pre-implemented accumulator types for common use cases.
94
95
```scala { .api }
96
// Long accumulator
97
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
98
def add(v: Long): Unit
99
def add(v: jl.Long): Unit
100
def count: Long
101
def sum: Long
102
def avg: Double
103
}
104
105
// Double accumulator
106
class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
107
def add(v: Double): Unit
108
def add(v: jl.Double): Unit
109
def count: Long
110
def sum: Double
111
def avg: Double
112
}
113
114
// Collection accumulator
115
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
116
def add(v: T): Unit
117
def copyAndReset(): CollectionAccumulator[T]
118
}
119
120
// Creation via SparkContext
121
def longAccumulator(): LongAccumulator
122
def longAccumulator(name: String): LongAccumulator
123
def doubleAccumulator(): DoubleAccumulator
124
def doubleAccumulator(name: String): DoubleAccumulator
125
def collectionAccumulator[T](): CollectionAccumulator[T]
126
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
127
```
128
129
**Usage Examples:**
130
```scala
131
// Count errors during processing
132
val errorCount = sc.longAccumulator("Processing Errors")
133
val totalProcessed = sc.longAccumulator("Total Processed")
134
135
val data = sc.parallelize(1 to 1000)
136
val results = data.map { value =>
137
totalProcessed.add(1)
138
try {
139
processValue(value)
140
} catch {
141
case _: Exception =>
142
errorCount.add(1)
143
None
144
}
145
}.filter(_.isDefined).map(_.get)
146
147
// Trigger action to execute
148
results.count()
149
150
// Check accumulator values
151
println(s"Total processed: ${totalProcessed.value}")
152
println(s"Errors: ${errorCount.value}")
153
println(s"Success rate: ${(totalProcessed.value - errorCount.value).toDouble / totalProcessed.value}")
154
155
// Collect sample error messages
156
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
157
data.foreach { value =>
158
try {
159
processValue(value)
160
} catch {
161
case e: Exception => errorMessages.add(e.getMessage)
162
}
163
}
164
```
165
166
### Custom Accumulators
167
168
Create custom accumulators for specialized aggregation needs.
169
170
```scala { .api }
171
// Custom accumulator example structure
172
class CustomAccumulator extends AccumulatorV2[InputType, OutputType] {
173
private var _value: InternalState = initialState
174
175
def isZero: Boolean = _value == initialState
176
def copy(): CustomAccumulator = new CustomAccumulator // with copied state
177
def reset(): Unit = _value = initialState
178
def add(v: InputType): Unit = updateInternalState(v)
179
def merge(other: AccumulatorV2[InputType, OutputType]): Unit = mergeStates(other)
180
def value: OutputType = computeOutputFromState(_value)
181
}
182
```
183
184
**Custom Accumulator Examples:**
185
```scala
186
// Statistics accumulator
187
class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {
188
private var count: Long = 0
189
private var sum: Double = 0.0
190
private var sumSquares: Double = 0.0
191
private var min: Double = Double.MaxValue
192
private var max: Double = Double.MinValue
193
194
def isZero: Boolean = count == 0
195
196
def copy(): StatsAccumulator = {
197
val acc = new StatsAccumulator
198
acc.count = this.count
199
acc.sum = this.sum
200
acc.sumSquares = this.sumSquares
201
acc.min = this.min
202
acc.max = this.max
203
acc
204
}
205
206
def reset(): Unit = {
207
count = 0
208
sum = 0.0
209
sumSquares = 0.0
210
min = Double.MaxValue
211
max = Double.MinValue
212
}
213
214
def add(v: Double): Unit = {
215
count += 1
216
sum += v
217
sumSquares += v * v
218
min = math.min(min, v)
219
max = math.max(max, v)
220
}
221
222
def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {
223
other match {
224
case o: StatsAccumulator =>
225
if (o.count > 0) {
226
count += o.count
227
sum += o.sum
228
sumSquares += o.sumSquares
229
min = math.min(min, o.min)
230
max = math.max(max, o.max)
231
}
232
}
233
}
234
235
def value: (Long, Double, Double, Double, Double) = {
236
val mean = if (count > 0) sum / count else 0.0
237
val variance = if (count > 1) (sumSquares - sum * sum / count) / (count - 1) else 0.0
238
(count, sum, mean, math.sqrt(variance), min, max)
239
}
240
}
241
242
// Usage
243
val statsAcc = new StatsAccumulator
244
sc.register(statsAcc, "Data Statistics")
245
246
val data = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 100.0))
247
data.foreach(statsAcc.add(_))
248
249
val (count, sum, mean, stddev, min, max) = statsAcc.value
250
println(s"Count: $count, Mean: $mean, StdDev: $stddev, Min: $min, Max: $max")
251
```
252
253
### Histogram Accumulator
254
255
Built-in accumulator for creating histograms.
256
257
```scala { .api }
258
// Available on RDD[Double]
259
def histogram(buckets: Array[Double]): Array[Long]
260
def histogram(bucketNum: Int): (Array[Double], Array[Long])
261
```
262
263
**Usage Example:**
264
```scala
265
val data = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1, 7.9, 8.3, 9.1, 10.0))
266
267
// Histogram with specified bucket boundaries
268
val buckets = Array(0.0, 2.0, 4.0, 6.0, 8.0, 10.0)
269
val counts = data.histogram(buckets)
270
// Result: counts for ranges [0-2), [2-4), [4-6), [6-8), [8-10]
271
272
// Histogram with automatic bucket calculation
273
val (autoBuckets, autoCounts) = data.histogram(5)
274
```
275
276
## Best Practices and Performance
277
278
### Broadcast Variable Guidelines
279
280
```scala
281
// DO: Broadcast large read-only data used across many tasks
282
val broadcastTable = sc.broadcast(largeHashMap)
283
284
// DON'T: Broadcast small data or data used in few tasks
285
val smallConfig = Map("key" -> "value") // Just include directly
286
287
// DO: Unpersist when done to free memory
288
broadcastTable.unpersist()
289
290
// DO: Use broadcast for join optimization with small dimension tables
291
val smallTable = sc.broadcast(dimensionTable.collectAsMap())
292
val enriched = factTable.map { row =>
293
val enrichmentData = smallTable.value.get(row.key)
294
enrichRow(row, enrichmentData)
295
}
296
```
297
298
### Accumulator Guidelines
299
300
```scala
301
// DO: Use accumulators for monitoring and debugging
302
val validRecords = sc.longAccumulator("Valid Records")
303
val invalidRecords = sc.longAccumulator("Invalid Records")
304
305
// DON'T: Use accumulators for functional logic (non-deterministic)
306
// Accumulators should be side-effects only
307
308
// DO: Check accumulator values after actions, not transformations
309
data.map { record =>
310
if (isValid(record)) {
311
validRecords.add(1)
312
processRecord(record)
313
} else {
314
invalidRecords.add(1)
315
None
316
}
317
}.collect() // Action triggers accumulator updates
318
319
println(s"Valid: ${validRecords.value}, Invalid: ${invalidRecords.value}")
320
```
321
322
## Types
323
324
```scala { .api }
325
// Broadcast variable base class
326
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable
327
328
// Accumulator base class
329
abstract class AccumulatorV2[IN, OUT] extends Serializable
330
331
// Built-in accumulator implementations
332
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long]
333
class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double]
334
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]
335
336
// Accumulator registration and metadata
337
trait AccumulatorParam[T] {
338
def addInPlace(t1: T, t2: T): T
339
def zero(initialValue: T): T
340
}
341
342
case class AccumulableInfo(
343
id: Long,
344
name: Option[String],
345
update: Option[String],
346
value: String,
347
internal: Boolean,
348
countFailedValues: Boolean,
349
metadata: Option[String]
350
)
351
```