or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md
tile.json

broadcast-variables.mddocs/

Broadcast Variables

Efficient read-only variable distribution to all cluster nodes for sharing large datasets, lookup tables, or configuration across distributed tasks.

Capabilities

Broadcast Variables

Read-only variables cached on each worker node to efficiently share large data structures across tasks without serializing them with every task.

/**
 * Read-only variable broadcast to all cluster nodes
 * @tparam T type of the broadcast variable
 */
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
  /** Get the broadcast value */
  def value: T
  
  /** Asynchronously delete cached copies on executors */
  def unpersist(): Unit
  def unpersist(blocking: Boolean): Unit
  
  /** Destroy all data and metadata related to this broadcast variable */
  def destroy(): Unit
  def destroy(blocking: Boolean): Unit
  
  /** String representation */
  override def toString: String = s"Broadcast($id)"
}

Creating Broadcast Variables

Broadcast variables are created through SparkContext methods.

class SparkContext(config: SparkConf) {
  /** Create broadcast variable from value */
  def broadcast[T: ClassTag](value: T): Broadcast[T]
}

// Java API
public class JavaSparkContext {
  /** Create broadcast variable from value */
  public <T> Broadcast<T> broadcast(T value)
}

TorrentBroadcast Implementation

Default implementation using BitTorrent-like protocol for efficient distribution.

/**
 * BitTorrent-like broadcast implementation
 */
class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) {
  /** Get broadcast value, downloading if necessary */
  override def value: T
  
  /** Remove persisted state */
  override def unpersist(blocking: Boolean = false): Unit
  
  /** Destroy broadcast variable */
  override def destroy(blocking: Boolean = false): Unit
}

Broadcast Manager

Internal component managing broadcast variable lifecycle and distribution.

/**
 * Manages broadcast variables on driver and executors
 */
class BroadcastManager(
  isDriver: Boolean,
  conf: SparkConf,
  securityManager: SecurityManager
) extends Logging {
  
  /** Create new broadcast variable */
  def newBroadcast[T: ClassTag](value: T, isLocal: Boolean): Broadcast[T]
  
  /** Unpersist broadcast variable */
  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
  
  /** Initialize broadcast manager */
  def initialize(): Unit
  
  /** Stop broadcast manager */
  def stop(): Unit
}

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example"))

// Create large lookup table
val lookupTable = Map(
  "user1" -> "John Doe",
  "user2" -> "Jane Smith", 
  "user3" -> "Bob Johnson"
  // ... potentially thousands of entries
)

// Broadcast the lookup table
val broadcastLookup = sc.broadcast(lookupTable)

// Create RDD of user IDs
val userIds = sc.parallelize(List("user1", "user2", "user3", "user1"))

// Use broadcast variable in transformation
// The lookup table is sent to each executor only once
val userNames = userIds.map { id =>
  val lookup = broadcastLookup.value // Access broadcast value
  lookup.getOrElse(id, "Unknown User")
}

val results = userNames.collect()
// Results: Array("John Doe", "Jane Smith", "Bob Johnson", "John Doe")

// Configuration broadcasting
val config = Map(
  "api.endpoint" -> "https://api.example.com",
  "timeout" -> "30",
  "retries" -> "3"
)
val broadcastConfig = sc.broadcast(config)

val processedData = someRDD.map { record =>
  val conf = broadcastConfig.value
  // Use configuration in processing
  processRecord(record, conf("api.endpoint"), conf("timeout").toInt)
}

// Machine learning model broadcasting
case class MLModel(weights: Array[Double], intercept: Double) {
  def predict(features: Array[Double]): Double = {
    weights.zip(features).map { case (w, f) => w * f }.sum + intercept
  }
}

val trainedModel = MLModel(Array(0.5, -0.3, 0.8), 0.1)
val broadcastModel = sc.broadcast(trainedModel)

val predictions = featuresRDD.map { features =>
  val model = broadcastModel.value
  model.predict(features)
}

// Cleanup when done
broadcastLookup.unpersist()
broadcastConfig.destroy()
broadcastModel.destroy()

sc.stop()

Java Examples:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

JavaSparkContext sc = new JavaSparkContext(
    new SparkConf().setAppName("Java Broadcast Example")
);

// Create lookup map
Map<String, String> lookupTable = new HashMap<>();
lookupTable.put("user1", "John Doe");
lookupTable.put("user2", "Jane Smith");
lookupTable.put("user3", "Bob Johnson");

// Broadcast the map
Broadcast<Map<String, String>> broadcastLookup = sc.broadcast(lookupTable);

// Create RDD
List<String> userIds = Arrays.asList("user1", "user2", "user3");
JavaRDD<String> userRDD = sc.parallelize(userIds);

// Use broadcast variable
JavaRDD<String> userNames = userRDD.map(id -> {
    Map<String, String> lookup = broadcastLookup.value();
    return lookup.getOrDefault(id, "Unknown User");
});

List<String> results = userNames.collect();

// Cleanup
broadcastLookup.destroy();
sc.close();

Best Practices

When to Use Broadcast Variables

  • Large lookup tables: Reference data needed by many tasks
  • Configuration objects: Application settings used across tasks
  • Machine learning models: Trained models for prediction tasks
  • Static data: Any read-only data accessed frequently

Size Considerations

  • Broadcast variables are loaded into memory on each executor
  • Consider memory constraints when broadcasting large objects
  • Monitor executor memory usage with broadcast variables
  • Use serialization-friendly data structures

Performance Tips

// Good: Broadcast large, reusable data
val largeLookup = sc.broadcast(Map(/* large dataset */))
val result = rdd.map(x => largeLookup.value.get(x.key))

// Bad: Don't broadcast small data or data used only once
val smallMap = Map("a" -> 1, "b" -> 2) // Just use directly
val result = rdd.map(x => smallMap.get(x.key)) // Serialized with each task

// Good: Reuse broadcast variables across multiple operations
val config = sc.broadcast(appConfig)
val step1 = rdd1.map(x => process1(x, config.value))
val step2 = rdd2.map(x => process2(x, config.value))

// Good: Clean up when no longer needed
config.unpersist() // Remove from executor memory
config.destroy()   // Remove all references

Error Handling

// Handle potential serialization issues
try {
  val broadcast = sc.broadcast(complexObject)
  // Use broadcast variable
} catch {
  case e: NotSerializableException =>
    // Handle serialization failure
    println(s"Cannot broadcast non-serializable object: $e")
}

// Defensive access patterns
val result = rdd.map { x =>
  try {
    val lookup = broadcastLookup.value
    lookup.getOrElse(x.key, defaultValue)
  } catch {
    case e: Exception =>
      // Handle broadcast access failures
      defaultValue
  }
}

Broadcast variables provide an efficient mechanism for distributing read-only data to all nodes in a Spark cluster, significantly reducing network overhead compared to sending data with each task.