0
# Shared Variables
1
2
Spark provides two types of shared variables for efficient data sharing across distributed computations: broadcast variables for read-only data and accumulators for write-only aggregations.
3
4
## Broadcast Variables
5
6
Broadcast variables allow efficient sharing of large read-only datasets across all nodes in a cluster.
7
8
```scala { .api }
9
abstract class Broadcast[T](id: Long) {
10
def value: T
11
def unpersist(): Unit
12
def unpersist(blocking: Boolean): Unit
13
def destroy(): Unit
14
def id: Long
15
def toString: String
16
}
17
```
18
19
### Creating Broadcast Variables
20
```scala { .api }
21
// From SparkContext
22
def broadcast[T: ClassTag](value: T): Broadcast[T]
23
```
24
25
## Accumulators V2
26
27
Modern accumulator API providing type-safe, efficient aggregation across distributed computations.
28
29
```scala { .api }
30
abstract class AccumulatorV2[IN, OUT] {
31
// Core Operations
32
def isZero: Boolean
33
def copy(): AccumulatorV2[IN, OUT]
34
def reset(): Unit
35
def add(v: IN): Unit
36
def merge(other: AccumulatorV2[IN, OUT]): Unit
37
def value: OUT
38
39
// Metadata
40
def name: Option[String]
41
def id: Long
42
def isRegistered: Boolean
43
}
44
```
45
46
## Built-in Accumulator Types
47
48
### LongAccumulator
49
```scala { .api }
50
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
51
def add(v: Long): Unit
52
def add(v: java.lang.Long): Unit
53
def count: Long
54
def sum: Long
55
def avg: Double
56
def value: java.lang.Long
57
58
// AccumulatorV2 implementation
59
def isZero: Boolean
60
def copy(): LongAccumulator
61
def reset(): Unit
62
def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit
63
}
64
```
65
66
### DoubleAccumulator
67
```scala { .api }
68
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
69
def add(v: Double): Unit
70
def add(v: java.lang.Double): Unit
71
def count: Long
72
def sum: Double
73
def avg: Double
74
def value: java.lang.Double
75
76
// AccumulatorV2 implementation
77
def isZero: Boolean
78
def copy(): DoubleAccumulator
79
def reset(): Unit
80
def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit
81
}
82
```
83
84
### CollectionAccumulator
85
```scala { .api }
86
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
87
def add(v: T): Unit
88
def value: java.util.List[T]
89
90
// AccumulatorV2 implementation
91
def isZero: Boolean
92
def copy(): CollectionAccumulator[T]
93
def reset(): Unit
94
def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit
95
}
96
```
97
98
## Creating Accumulators
99
100
### From SparkContext
101
```scala { .api }
102
// Long accumulators
103
def longAccumulator(): LongAccumulator
104
def longAccumulator(name: String): LongAccumulator
105
106
// Double accumulators
107
def doubleAccumulator(): DoubleAccumulator
108
def doubleAccumulator(name: String): DoubleAccumulator
109
110
// Collection accumulators
111
def collectionAccumulator[T](): CollectionAccumulator[T]
112
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
113
114
// Custom accumulators
115
def register[T](acc: AccumulatorV2[T, T]): Unit
116
def register[T](acc: AccumulatorV2[T, T], name: String): Unit
117
```
118
119
## Custom Accumulators
120
121
Creating custom accumulator types by extending AccumulatorV2.
122
123
```scala { .api }
124
// Example: Set accumulator for collecting unique values
125
class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
126
private val _set = mutable.Set.empty[T]
127
128
def isZero: Boolean = _set.isEmpty
129
130
def copy(): SetAccumulator[T] = {
131
val newAcc = new SetAccumulator[T]
132
newAcc._set ++= _set
133
newAcc
134
}
135
136
def reset(): Unit = _set.clear()
137
138
def add(v: T): Unit = _set += v
139
140
def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
141
other match {
142
case set: SetAccumulator[T] => _set ++= set._set
143
case _ => throw new UnsupportedOperationException("Cannot merge different accumulator types")
144
}
145
}
146
147
def value: java.util.Set[T] = _set.asJava
148
}
149
```
150
151
## Usage Examples
152
153
### Broadcast Variables
154
```scala
155
import org.apache.spark.broadcast.Broadcast
156
157
// Large lookup table that will be used across many tasks
158
val lookupTable = Map(
159
"user1" -> "John Doe",
160
"user2" -> "Jane Smith",
161
// ... thousands more entries
162
)
163
164
// Broadcast the lookup table
165
val broadcastLookup: Broadcast[Map[String, String]] = sc.broadcast(lookupTable)
166
167
// Use in transformations
168
val userIds = sc.parallelize(Array("user1", "user2", "user1", "user3"))
169
val enrichedData = userIds.map { userId =>
170
val lookup = broadcastLookup.value // Access broadcast value
171
val userName = lookup.getOrElse(userId, "Unknown")
172
(userId, userName)
173
}
174
175
val result = enrichedData.collect()
176
// Result: Array((user1,John Doe), (user2,Jane Smith), (user1,John Doe), (user3,Unknown))
177
178
// Clean up when done
179
broadcastLookup.unpersist()
180
```
181
182
### Long Accumulator
183
```scala
184
val data = sc.parallelize(1 to 1000)
185
186
// Create accumulator for counting even numbers
187
val evenCount = sc.longAccumulator("Even Numbers")
188
189
// Use in transformation
190
val processed = data.map { num =>
191
if (num % 2 == 0) {
192
evenCount.add(1) // Accumulate even numbers
193
}
194
num * num
195
}
196
197
// Trigger action to execute transformations
198
val result = processed.collect()
199
200
// Access accumulator value
201
println(s"Found ${evenCount.value} even numbers")
202
```
203
204
### Collection Accumulator
205
```scala
206
val textData = sc.parallelize(Array("error: failed", "info: success", "error: timeout", "debug: trace"))
207
208
// Accumulator to collect all error messages
209
val errorMessages = sc.collectionAccumulator[String]("Error Messages")
210
211
// Process data and collect errors
212
val processedData = textData.map { line =>
213
if (line.startsWith("error:")) {
214
errorMessages.add(line) // Collect error messages
215
}
216
line.toUpperCase
217
}
218
219
// Trigger action
220
processedData.count()
221
222
// Access collected errors
223
val errors = errorMessages.value
224
println(s"Found ${errors.size()} errors: ${errors}")
225
```
226
227
### Custom Set Accumulator
228
```scala
229
// Register custom accumulator
230
val uniqueWords = new SetAccumulator[String]
231
sc.register(uniqueWords, "Unique Words")
232
233
val sentences = sc.parallelize(Array(
234
"hello world",
235
"world of spark",
236
"hello spark"
237
))
238
239
// Use custom accumulator
240
val wordCounts = sentences.flatMap(_.split(" ")).map { word =>
241
uniqueWords.add(word) // Collect unique words
242
(word, 1)
243
}.reduceByKey(_ + _)
244
245
// Trigger action
246
val counts = wordCounts.collect()
247
248
// Access unique words
249
val unique = uniqueWords.value
250
println(s"Found ${unique.size()} unique words: ${unique}")
251
```
252
253
### Advanced Patterns
254
255
#### Error Tracking with Multiple Accumulators
256
```scala
257
val malformedRecords = sc.longAccumulator("Malformed Records")
258
val validRecords = sc.longAccumulator("Valid Records")
259
val errorDetails = sc.collectionAccumulator[String]("Error Details")
260
261
val processedData = rawData.map { record =>
262
try {
263
val parsed = parseRecord(record)
264
validRecords.add(1)
265
parsed
266
} catch {
267
case e: Exception =>
268
malformedRecords.add(1)
269
errorDetails.add(s"Error parsing '$record': ${e.getMessage}")
270
null
271
}
272
}.filter(_ != null)
273
274
processedData.count()
275
276
println(s"Valid: ${validRecords.value}, Malformed: ${malformedRecords.value}")
277
errorDetails.value.foreach(println)
278
```
279
280
#### Performance Monitoring
281
```scala
282
val processingTime = sc.doubleAccumulator("Processing Time (ms)")
283
val recordsProcessed = sc.longAccumulator("Records Processed")
284
285
val result = data.map { record =>
286
val start = System.currentTimeMillis()
287
val processed = expensiveProcessing(record)
288
val elapsed = System.currentTimeMillis() - start
289
290
processingTime.add(elapsed.toDouble)
291
recordsProcessed.add(1)
292
293
processed
294
}
295
296
result.count()
297
298
println(f"Average processing time: ${processingTime.value / recordsProcessed.value}%.2f ms per record")
299
```
300
301
## Best Practices
302
303
### Broadcast Variables
304
1. **Use for large read-only data**: Ideal for lookup tables, configuration, models
305
2. **Avoid frequent updates**: Broadcast variables are immutable
306
3. **Size considerations**: Should fit comfortably in executor memory
307
4. **Clean up**: Call `unpersist()` when no longer needed
308
5. **Serialization**: Ensure broadcast data is efficiently serializable
309
310
### Accumulators
311
1. **Use in actions only**: Results are only reliable when used in actions, not transformations
312
2. **Idempotent operations**: Should handle task retries gracefully
313
3. **Named accumulators**: Use names for better monitoring in Spark UI
314
4. **Register custom types**: Register custom accumulators for proper tracking
315
5. **Avoid side effects**: Don't rely on accumulator updates for program logic