Efficient read-only variable distribution to all cluster nodes for sharing large datasets, lookup tables, or configuration across distributed tasks.
Read-only variables cached on each worker node to efficiently share large data structures across tasks without serializing them with every task.
/**
* Read-only variable broadcast to all cluster nodes
* @tparam T type of the broadcast variable
*/
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
/** Get the broadcast value */
def value: T
/** Asynchronously delete cached copies on executors */
def unpersist(): Unit
def unpersist(blocking: Boolean): Unit
/** Destroy all data and metadata related to this broadcast variable */
def destroy(): Unit
def destroy(blocking: Boolean): Unit
/** String representation */
override def toString: String = s"Broadcast($id)"
}Broadcast variables are created through SparkContext methods.
class SparkContext(config: SparkConf) {
/** Create broadcast variable from value */
def broadcast[T: ClassTag](value: T): Broadcast[T]
}
// Java API
public class JavaSparkContext {
/** Create broadcast variable from value */
public <T> Broadcast<T> broadcast(T value)
}Default implementation using BitTorrent-like protocol for efficient distribution.
/**
* BitTorrent-like broadcast implementation
*/
class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) {
/** Get broadcast value, downloading if necessary */
override def value: T
/** Remove persisted state */
override def unpersist(blocking: Boolean = false): Unit
/** Destroy broadcast variable */
override def destroy(blocking: Boolean = false): Unit
}Internal component managing broadcast variable lifecycle and distribution.
/**
* Manages broadcast variables on driver and executors
*/
class BroadcastManager(
isDriver: Boolean,
conf: SparkConf,
securityManager: SecurityManager
) extends Logging {
/** Create new broadcast variable */
def newBroadcast[T: ClassTag](value: T, isLocal: Boolean): Broadcast[T]
/** Unpersist broadcast variable */
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
/** Initialize broadcast manager */
def initialize(): Unit
/** Stop broadcast manager */
def stop(): Unit
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example"))
// Create large lookup table
val lookupTable = Map(
"user1" -> "John Doe",
"user2" -> "Jane Smith",
"user3" -> "Bob Johnson"
// ... potentially thousands of entries
)
// Broadcast the lookup table
val broadcastLookup = sc.broadcast(lookupTable)
// Create RDD of user IDs
val userIds = sc.parallelize(List("user1", "user2", "user3", "user1"))
// Use broadcast variable in transformation
// The lookup table is sent to each executor only once
val userNames = userIds.map { id =>
val lookup = broadcastLookup.value // Access broadcast value
lookup.getOrElse(id, "Unknown User")
}
val results = userNames.collect()
// Results: Array("John Doe", "Jane Smith", "Bob Johnson", "John Doe")
// Configuration broadcasting
val config = Map(
"api.endpoint" -> "https://api.example.com",
"timeout" -> "30",
"retries" -> "3"
)
val broadcastConfig = sc.broadcast(config)
val processedData = someRDD.map { record =>
val conf = broadcastConfig.value
// Use configuration in processing
processRecord(record, conf("api.endpoint"), conf("timeout").toInt)
}
// Machine learning model broadcasting
case class MLModel(weights: Array[Double], intercept: Double) {
def predict(features: Array[Double]): Double = {
weights.zip(features).map { case (w, f) => w * f }.sum + intercept
}
}
val trainedModel = MLModel(Array(0.5, -0.3, 0.8), 0.1)
val broadcastModel = sc.broadcast(trainedModel)
val predictions = featuresRDD.map { features =>
val model = broadcastModel.value
model.predict(features)
}
// Cleanup when done
broadcastLookup.unpersist()
broadcastConfig.destroy()
broadcastModel.destroy()
sc.stop()Java Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.broadcast.Broadcast;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("Java Broadcast Example")
);
// Create lookup map
Map<String, String> lookupTable = new HashMap<>();
lookupTable.put("user1", "John Doe");
lookupTable.put("user2", "Jane Smith");
lookupTable.put("user3", "Bob Johnson");
// Broadcast the map
Broadcast<Map<String, String>> broadcastLookup = sc.broadcast(lookupTable);
// Create RDD
List<String> userIds = Arrays.asList("user1", "user2", "user3");
JavaRDD<String> userRDD = sc.parallelize(userIds);
// Use broadcast variable
JavaRDD<String> userNames = userRDD.map(id -> {
Map<String, String> lookup = broadcastLookup.value();
return lookup.getOrDefault(id, "Unknown User");
});
List<String> results = userNames.collect();
// Cleanup
broadcastLookup.destroy();
sc.close();// Good: Broadcast large, reusable data
val largeLookup = sc.broadcast(Map(/* large dataset */))
val result = rdd.map(x => largeLookup.value.get(x.key))
// Bad: Don't broadcast small data or data used only once
val smallMap = Map("a" -> 1, "b" -> 2) // Just use directly
val result = rdd.map(x => smallMap.get(x.key)) // Serialized with each task
// Good: Reuse broadcast variables across multiple operations
val config = sc.broadcast(appConfig)
val step1 = rdd1.map(x => process1(x, config.value))
val step2 = rdd2.map(x => process2(x, config.value))
// Good: Clean up when no longer needed
config.unpersist() // Remove from executor memory
config.destroy() // Remove all references// Handle potential serialization issues
try {
val broadcast = sc.broadcast(complexObject)
// Use broadcast variable
} catch {
case e: NotSerializableException =>
// Handle serialization failure
println(s"Cannot broadcast non-serializable object: $e")
}
// Defensive access patterns
val result = rdd.map { x =>
try {
val lookup = broadcastLookup.value
lookup.getOrElse(x.key, defaultValue)
} catch {
case e: Exception =>
// Handle broadcast access failures
defaultValue
}
}Broadcast variables provide an efficient mechanism for distributing read-only data to all nodes in a Spark cluster, significantly reducing network overhead compared to sending data with each task.