0
# Broadcast Variables
1
2
Efficient read-only variable distribution to all cluster nodes for sharing large datasets, lookup tables, or configuration across distributed tasks.
3
4
## Capabilities
5
6
### Broadcast Variables
7
8
Read-only variables cached on each worker node to efficiently share large data structures across tasks without serializing them with every task.
9
10
```scala { .api }
11
/**
12
* Read-only variable broadcast to all cluster nodes
13
* @tparam T type of the broadcast variable
14
*/
15
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
16
/** Get the broadcast value */
17
def value: T
18
19
/** Asynchronously delete cached copies on executors */
20
def unpersist(): Unit
21
def unpersist(blocking: Boolean): Unit
22
23
/** Destroy all data and metadata related to this broadcast variable */
24
def destroy(): Unit
25
def destroy(blocking: Boolean): Unit
26
27
/** String representation */
28
override def toString: String = s"Broadcast($id)"
29
}
30
```
31
32
### Creating Broadcast Variables
33
34
Broadcast variables are created through SparkContext methods.
35
36
```scala { .api }
37
class SparkContext(config: SparkConf) {
38
/** Create broadcast variable from value */
39
def broadcast[T: ClassTag](value: T): Broadcast[T]
40
}
41
42
// Java API
43
public class JavaSparkContext {
44
/** Create broadcast variable from value */
45
public <T> Broadcast<T> broadcast(T value)
46
}
47
```
48
49
### TorrentBroadcast Implementation
50
51
Default implementation using BitTorrent-like protocol for efficient distribution.
52
53
```scala { .api }
54
/**
55
* BitTorrent-like broadcast implementation
56
*/
57
class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) {
58
/** Get broadcast value, downloading if necessary */
59
override def value: T
60
61
/** Remove persisted state */
62
override def unpersist(blocking: Boolean = false): Unit
63
64
/** Destroy broadcast variable */
65
override def destroy(blocking: Boolean = false): Unit
66
}
67
```
68
69
### Broadcast Manager
70
71
Internal component managing broadcast variable lifecycle and distribution.
72
73
```scala { .api }
74
/**
75
* Manages broadcast variables on driver and executors
76
*/
77
class BroadcastManager(
78
isDriver: Boolean,
79
conf: SparkConf,
80
securityManager: SecurityManager
81
) extends Logging {
82
83
/** Create new broadcast variable */
84
def newBroadcast[T: ClassTag](value: T, isLocal: Boolean): Broadcast[T]
85
86
/** Unpersist broadcast variable */
87
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
88
89
/** Initialize broadcast manager */
90
def initialize(): Unit
91
92
/** Stop broadcast manager */
93
def stop(): Unit
94
}
95
```
96
97
**Usage Examples:**
98
99
```scala
100
import org.apache.spark.{SparkContext, SparkConf}
101
102
val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example"))
103
104
// Create large lookup table
105
val lookupTable = Map(
106
"user1" -> "John Doe",
107
"user2" -> "Jane Smith",
108
"user3" -> "Bob Johnson"
109
// ... potentially thousands of entries
110
)
111
112
// Broadcast the lookup table
113
val broadcastLookup = sc.broadcast(lookupTable)
114
115
// Create RDD of user IDs
116
val userIds = sc.parallelize(List("user1", "user2", "user3", "user1"))
117
118
// Use broadcast variable in transformation
119
// The lookup table is sent to each executor only once
120
val userNames = userIds.map { id =>
121
val lookup = broadcastLookup.value // Access broadcast value
122
lookup.getOrElse(id, "Unknown User")
123
}
124
125
val results = userNames.collect()
126
// Results: Array("John Doe", "Jane Smith", "Bob Johnson", "John Doe")
127
128
// Configuration broadcasting
129
val config = Map(
130
"api.endpoint" -> "https://api.example.com",
131
"timeout" -> "30",
132
"retries" -> "3"
133
)
134
val broadcastConfig = sc.broadcast(config)
135
136
val processedData = someRDD.map { record =>
137
val conf = broadcastConfig.value
138
// Use configuration in processing
139
processRecord(record, conf("api.endpoint"), conf("timeout").toInt)
140
}
141
142
// Machine learning model broadcasting
143
case class MLModel(weights: Array[Double], intercept: Double) {
144
def predict(features: Array[Double]): Double = {
145
weights.zip(features).map { case (w, f) => w * f }.sum + intercept
146
}
147
}
148
149
val trainedModel = MLModel(Array(0.5, -0.3, 0.8), 0.1)
150
val broadcastModel = sc.broadcast(trainedModel)
151
152
val predictions = featuresRDD.map { features =>
153
val model = broadcastModel.value
154
model.predict(features)
155
}
156
157
// Cleanup when done
158
broadcastLookup.unpersist()
159
broadcastConfig.destroy()
160
broadcastModel.destroy()
161
162
sc.stop()
163
```
164
165
**Java Examples:**
166
167
```java
168
import org.apache.spark.SparkConf;
169
import org.apache.spark.api.java.JavaSparkContext;
170
import org.apache.spark.api.java.JavaRDD;
171
import org.apache.spark.broadcast.Broadcast;
172
173
import java.util.Arrays;
174
import java.util.HashMap;
175
import java.util.List;
176
import java.util.Map;
177
178
JavaSparkContext sc = new JavaSparkContext(
179
new SparkConf().setAppName("Java Broadcast Example")
180
);
181
182
// Create lookup map
183
Map<String, String> lookupTable = new HashMap<>();
184
lookupTable.put("user1", "John Doe");
185
lookupTable.put("user2", "Jane Smith");
186
lookupTable.put("user3", "Bob Johnson");
187
188
// Broadcast the map
189
Broadcast<Map<String, String>> broadcastLookup = sc.broadcast(lookupTable);
190
191
// Create RDD
192
List<String> userIds = Arrays.asList("user1", "user2", "user3");
193
JavaRDD<String> userRDD = sc.parallelize(userIds);
194
195
// Use broadcast variable
196
JavaRDD<String> userNames = userRDD.map(id -> {
197
Map<String, String> lookup = broadcastLookup.value();
198
return lookup.getOrDefault(id, "Unknown User");
199
});
200
201
List<String> results = userNames.collect();
202
203
// Cleanup
204
broadcastLookup.destroy();
205
sc.close();
206
```
207
208
## Best Practices
209
210
### When to Use Broadcast Variables
211
212
- **Large lookup tables**: Reference data needed by many tasks
213
- **Configuration objects**: Application settings used across tasks
214
- **Machine learning models**: Trained models for prediction tasks
215
- **Static data**: Any read-only data accessed frequently
216
217
### Size Considerations
218
219
- Broadcast variables are loaded into memory on each executor
220
- Consider memory constraints when broadcasting large objects
221
- Monitor executor memory usage with broadcast variables
222
- Use serialization-friendly data structures
223
224
### Performance Tips
225
226
```scala
227
// Good: Broadcast large, reusable data
228
val largeLookup = sc.broadcast(Map(/* large dataset */))
229
val result = rdd.map(x => largeLookup.value.get(x.key))
230
231
// Bad: Don't broadcast small data or data used only once
232
val smallMap = Map("a" -> 1, "b" -> 2) // Just use directly
233
val result = rdd.map(x => smallMap.get(x.key)) // Serialized with each task
234
235
// Good: Reuse broadcast variables across multiple operations
236
val config = sc.broadcast(appConfig)
237
val step1 = rdd1.map(x => process1(x, config.value))
238
val step2 = rdd2.map(x => process2(x, config.value))
239
240
// Good: Clean up when no longer needed
241
config.unpersist() // Remove from executor memory
242
config.destroy() // Remove all references
243
```
244
245
### Error Handling
246
247
```scala
248
// Handle potential serialization issues
249
try {
250
val broadcast = sc.broadcast(complexObject)
251
// Use broadcast variable
252
} catch {
253
case e: NotSerializableException =>
254
// Handle serialization failure
255
println(s"Cannot broadcast non-serializable object: $e")
256
}
257
258
// Defensive access patterns
259
val result = rdd.map { x =>
260
try {
261
val lookup = broadcastLookup.value
262
lookup.getOrElse(x.key, defaultValue)
263
} catch {
264
case e: Exception =>
265
// Handle broadcast access failures
266
defaultValue
267
}
268
}
269
```
270
271
Broadcast variables provide an efficient mechanism for distributing read-only data to all nodes in a Spark cluster, significantly reducing network overhead compared to sending data with each task.