or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

broadcast-variables.mddocs/

0

# Broadcast Variables

1

2

Efficient read-only variable distribution to all cluster nodes for sharing large datasets, lookup tables, or configuration across distributed tasks.

3

4

## Capabilities

5

6

### Broadcast Variables

7

8

Read-only variables cached on each worker node to efficiently share large data structures across tasks without serializing them with every task.

9

10

```scala { .api }

11

/**

12

* Read-only variable broadcast to all cluster nodes

13

* @tparam T type of the broadcast variable

14

*/

15

abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {

16

/** Get the broadcast value */

17

def value: T

18

19

/** Asynchronously delete cached copies on executors */

20

def unpersist(): Unit

21

def unpersist(blocking: Boolean): Unit

22

23

/** Destroy all data and metadata related to this broadcast variable */

24

def destroy(): Unit

25

def destroy(blocking: Boolean): Unit

26

27

/** String representation */

28

override def toString: String = s"Broadcast($id)"

29

}

30

```

31

32

### Creating Broadcast Variables

33

34

Broadcast variables are created through SparkContext methods.

35

36

```scala { .api }

37

class SparkContext(config: SparkConf) {

38

/** Create broadcast variable from value */

39

def broadcast[T: ClassTag](value: T): Broadcast[T]

40

}

41

42

// Java API

43

public class JavaSparkContext {

44

/** Create broadcast variable from value */

45

public <T> Broadcast<T> broadcast(T value)

46

}

47

```

48

49

### TorrentBroadcast Implementation

50

51

Default implementation using BitTorrent-like protocol for efficient distribution.

52

53

```scala { .api }

54

/**

55

* BitTorrent-like broadcast implementation

56

*/

57

class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) {

58

/** Get broadcast value, downloading if necessary */

59

override def value: T

60

61

/** Remove persisted state */

62

override def unpersist(blocking: Boolean = false): Unit

63

64

/** Destroy broadcast variable */

65

override def destroy(blocking: Boolean = false): Unit

66

}

67

```

68

69

### Broadcast Manager

70

71

Internal component managing broadcast variable lifecycle and distribution.

72

73

```scala { .api }

74

/**

75

* Manages broadcast variables on driver and executors

76

*/

77

class BroadcastManager(

78

isDriver: Boolean,

79

conf: SparkConf,

80

securityManager: SecurityManager

81

) extends Logging {

82

83

/** Create new broadcast variable */

84

def newBroadcast[T: ClassTag](value: T, isLocal: Boolean): Broadcast[T]

85

86

/** Unpersist broadcast variable */

87

def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit

88

89

/** Initialize broadcast manager */

90

def initialize(): Unit

91

92

/** Stop broadcast manager */

93

def stop(): Unit

94

}

95

```

96

97

**Usage Examples:**

98

99

```scala

100

import org.apache.spark.{SparkContext, SparkConf}

101

102

val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example"))

103

104

// Create large lookup table

105

val lookupTable = Map(

106

"user1" -> "John Doe",

107

"user2" -> "Jane Smith",

108

"user3" -> "Bob Johnson"

109

// ... potentially thousands of entries

110

)

111

112

// Broadcast the lookup table

113

val broadcastLookup = sc.broadcast(lookupTable)

114

115

// Create RDD of user IDs

116

val userIds = sc.parallelize(List("user1", "user2", "user3", "user1"))

117

118

// Use broadcast variable in transformation

119

// The lookup table is sent to each executor only once

120

val userNames = userIds.map { id =>

121

val lookup = broadcastLookup.value // Access broadcast value

122

lookup.getOrElse(id, "Unknown User")

123

}

124

125

val results = userNames.collect()

126

// Results: Array("John Doe", "Jane Smith", "Bob Johnson", "John Doe")

127

128

// Configuration broadcasting

129

val config = Map(

130

"api.endpoint" -> "https://api.example.com",

131

"timeout" -> "30",

132

"retries" -> "3"

133

)

134

val broadcastConfig = sc.broadcast(config)

135

136

val processedData = someRDD.map { record =>

137

val conf = broadcastConfig.value

138

// Use configuration in processing

139

processRecord(record, conf("api.endpoint"), conf("timeout").toInt)

140

}

141

142

// Machine learning model broadcasting

143

case class MLModel(weights: Array[Double], intercept: Double) {

144

def predict(features: Array[Double]): Double = {

145

weights.zip(features).map { case (w, f) => w * f }.sum + intercept

146

}

147

}

148

149

val trainedModel = MLModel(Array(0.5, -0.3, 0.8), 0.1)

150

val broadcastModel = sc.broadcast(trainedModel)

151

152

val predictions = featuresRDD.map { features =>

153

val model = broadcastModel.value

154

model.predict(features)

155

}

156

157

// Cleanup when done

158

broadcastLookup.unpersist()

159

broadcastConfig.destroy()

160

broadcastModel.destroy()

161

162

sc.stop()

163

```

164

165

**Java Examples:**

166

167

```java

168

import org.apache.spark.SparkConf;

169

import org.apache.spark.api.java.JavaSparkContext;

170

import org.apache.spark.api.java.JavaRDD;

171

import org.apache.spark.broadcast.Broadcast;

172

173

import java.util.Arrays;

174

import java.util.HashMap;

175

import java.util.List;

176

import java.util.Map;

177

178

JavaSparkContext sc = new JavaSparkContext(

179

new SparkConf().setAppName("Java Broadcast Example")

180

);

181

182

// Create lookup map

183

Map<String, String> lookupTable = new HashMap<>();

184

lookupTable.put("user1", "John Doe");

185

lookupTable.put("user2", "Jane Smith");

186

lookupTable.put("user3", "Bob Johnson");

187

188

// Broadcast the map

189

Broadcast<Map<String, String>> broadcastLookup = sc.broadcast(lookupTable);

190

191

// Create RDD

192

List<String> userIds = Arrays.asList("user1", "user2", "user3");

193

JavaRDD<String> userRDD = sc.parallelize(userIds);

194

195

// Use broadcast variable

196

JavaRDD<String> userNames = userRDD.map(id -> {

197

Map<String, String> lookup = broadcastLookup.value();

198

return lookup.getOrDefault(id, "Unknown User");

199

});

200

201

List<String> results = userNames.collect();

202

203

// Cleanup

204

broadcastLookup.destroy();

205

sc.close();

206

```

207

208

## Best Practices

209

210

### When to Use Broadcast Variables

211

212

- **Large lookup tables**: Reference data needed by many tasks

213

- **Configuration objects**: Application settings used across tasks

214

- **Machine learning models**: Trained models for prediction tasks

215

- **Static data**: Any read-only data accessed frequently

216

217

### Size Considerations

218

219

- Broadcast variables are loaded into memory on each executor

220

- Consider memory constraints when broadcasting large objects

221

- Monitor executor memory usage with broadcast variables

222

- Use serialization-friendly data structures

223

224

### Performance Tips

225

226

```scala

227

// Good: Broadcast large, reusable data

228

val largeLookup = sc.broadcast(Map(/* large dataset */))

229

val result = rdd.map(x => largeLookup.value.get(x.key))

230

231

// Bad: Don't broadcast small data or data used only once

232

val smallMap = Map("a" -> 1, "b" -> 2) // Just use directly

233

val result = rdd.map(x => smallMap.get(x.key)) // Serialized with each task

234

235

// Good: Reuse broadcast variables across multiple operations

236

val config = sc.broadcast(appConfig)

237

val step1 = rdd1.map(x => process1(x, config.value))

238

val step2 = rdd2.map(x => process2(x, config.value))

239

240

// Good: Clean up when no longer needed

241

config.unpersist() // Remove from executor memory

242

config.destroy() // Remove all references

243

```

244

245

### Error Handling

246

247

```scala

248

// Handle potential serialization issues

249

try {

250

val broadcast = sc.broadcast(complexObject)

251

// Use broadcast variable

252

} catch {

253

case e: NotSerializableException =>

254

// Handle serialization failure

255

println(s"Cannot broadcast non-serializable object: $e")

256

}

257

258

// Defensive access patterns

259

val result = rdd.map { x =>

260

try {

261

val lookup = broadcastLookup.value

262

lookup.getOrElse(x.key, defaultValue)

263

} catch {

264

case e: Exception =>

265

// Handle broadcast access failures

266

defaultValue

267

}

268

}

269

```

270

271

Broadcast variables provide an efficient mechanism for distributing read-only data to all nodes in a Spark cluster, significantly reducing network overhead compared to sending data with each task.