or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md

shared-variables.mddocs/

0

# Shared Variables

1

2

Spark provides two types of shared variables for efficient data sharing across distributed computations: broadcast variables for read-only data and accumulators for write-only aggregations.

3

4

## Broadcast Variables

5

6

Broadcast variables allow efficient sharing of large read-only datasets across all nodes in a cluster.

7

8

```scala { .api }

9

abstract class Broadcast[T](id: Long) {

10

def value: T

11

def unpersist(): Unit

12

def unpersist(blocking: Boolean): Unit

13

def destroy(): Unit

14

def id: Long

15

def toString: String

16

}

17

```

18

19

### Creating Broadcast Variables

20

```scala { .api }

21

// From SparkContext

22

def broadcast[T: ClassTag](value: T): Broadcast[T]

23

```

24

25

## Accumulators V2

26

27

Modern accumulator API providing type-safe, efficient aggregation across distributed computations.

28

29

```scala { .api }

30

abstract class AccumulatorV2[IN, OUT] {

31

// Core Operations

32

def isZero: Boolean

33

def copy(): AccumulatorV2[IN, OUT]

34

def reset(): Unit

35

def add(v: IN): Unit

36

def merge(other: AccumulatorV2[IN, OUT]): Unit

37

def value: OUT

38

39

// Metadata

40

def name: Option[String]

41

def id: Long

42

def isRegistered: Boolean

43

}

44

```

45

46

## Built-in Accumulator Types

47

48

### LongAccumulator

49

```scala { .api }

50

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {

51

def add(v: Long): Unit

52

def add(v: java.lang.Long): Unit

53

def count: Long

54

def sum: Long

55

def avg: Double

56

def value: java.lang.Long

57

58

// AccumulatorV2 implementation

59

def isZero: Boolean

60

def copy(): LongAccumulator

61

def reset(): Unit

62

def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit

63

}

64

```

65

66

### DoubleAccumulator

67

```scala { .api }

68

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {

69

def add(v: Double): Unit

70

def add(v: java.lang.Double): Unit

71

def count: Long

72

def sum: Double

73

def avg: Double

74

def value: java.lang.Double

75

76

// AccumulatorV2 implementation

77

def isZero: Boolean

78

def copy(): DoubleAccumulator

79

def reset(): Unit

80

def merge(other: AccumulatorV2[java.lang.Double, java.lang.Double]): Unit

81

}

82

```

83

84

### CollectionAccumulator

85

```scala { .api }

86

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

87

def add(v: T): Unit

88

def value: java.util.List[T]

89

90

// AccumulatorV2 implementation

91

def isZero: Boolean

92

def copy(): CollectionAccumulator[T]

93

def reset(): Unit

94

def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit

95

}

96

```

97

98

## Creating Accumulators

99

100

### From SparkContext

101

```scala { .api }

102

// Long accumulators

103

def longAccumulator(): LongAccumulator

104

def longAccumulator(name: String): LongAccumulator

105

106

// Double accumulators

107

def doubleAccumulator(): DoubleAccumulator

108

def doubleAccumulator(name: String): DoubleAccumulator

109

110

// Collection accumulators

111

def collectionAccumulator[T](): CollectionAccumulator[T]

112

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

113

114

// Custom accumulators

115

def register[T](acc: AccumulatorV2[T, T]): Unit

116

def register[T](acc: AccumulatorV2[T, T], name: String): Unit

117

```

118

119

## Custom Accumulators

120

121

Creating custom accumulator types by extending AccumulatorV2.

122

123

```scala { .api }

124

// Example: Set accumulator for collecting unique values

125

class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {

126

private val _set = mutable.Set.empty[T]

127

128

def isZero: Boolean = _set.isEmpty

129

130

def copy(): SetAccumulator[T] = {

131

val newAcc = new SetAccumulator[T]

132

newAcc._set ++= _set

133

newAcc

134

}

135

136

def reset(): Unit = _set.clear()

137

138

def add(v: T): Unit = _set += v

139

140

def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {

141

other match {

142

case set: SetAccumulator[T] => _set ++= set._set

143

case _ => throw new UnsupportedOperationException("Cannot merge different accumulator types")

144

}

145

}

146

147

def value: java.util.Set[T] = _set.asJava

148

}

149

```

150

151

## Usage Examples

152

153

### Broadcast Variables

154

```scala

155

import org.apache.spark.broadcast.Broadcast

156

157

// Large lookup table that will be used across many tasks

158

val lookupTable = Map(

159

"user1" -> "John Doe",

160

"user2" -> "Jane Smith",

161

// ... thousands more entries

162

)

163

164

// Broadcast the lookup table

165

val broadcastLookup: Broadcast[Map[String, String]] = sc.broadcast(lookupTable)

166

167

// Use in transformations

168

val userIds = sc.parallelize(Array("user1", "user2", "user1", "user3"))

169

val enrichedData = userIds.map { userId =>

170

val lookup = broadcastLookup.value // Access broadcast value

171

val userName = lookup.getOrElse(userId, "Unknown")

172

(userId, userName)

173

}

174

175

val result = enrichedData.collect()

176

// Result: Array((user1,John Doe), (user2,Jane Smith), (user1,John Doe), (user3,Unknown))

177

178

// Clean up when done

179

broadcastLookup.unpersist()

180

```

181

182

### Long Accumulator

183

```scala

184

val data = sc.parallelize(1 to 1000)

185

186

// Create accumulator for counting even numbers

187

val evenCount = sc.longAccumulator("Even Numbers")

188

189

// Use in transformation

190

val processed = data.map { num =>

191

if (num % 2 == 0) {

192

evenCount.add(1) // Accumulate even numbers

193

}

194

num * num

195

}

196

197

// Trigger action to execute transformations

198

val result = processed.collect()

199

200

// Access accumulator value

201

println(s"Found ${evenCount.value} even numbers")

202

```

203

204

### Collection Accumulator

205

```scala

206

val textData = sc.parallelize(Array("error: failed", "info: success", "error: timeout", "debug: trace"))

207

208

// Accumulator to collect all error messages

209

val errorMessages = sc.collectionAccumulator[String]("Error Messages")

210

211

// Process data and collect errors

212

val processedData = textData.map { line =>

213

if (line.startsWith("error:")) {

214

errorMessages.add(line) // Collect error messages

215

}

216

line.toUpperCase

217

}

218

219

// Trigger action

220

processedData.count()

221

222

// Access collected errors

223

val errors = errorMessages.value

224

println(s"Found ${errors.size()} errors: ${errors}")

225

```

226

227

### Custom Set Accumulator

228

```scala

229

// Register custom accumulator

230

val uniqueWords = new SetAccumulator[String]

231

sc.register(uniqueWords, "Unique Words")

232

233

val sentences = sc.parallelize(Array(

234

"hello world",

235

"world of spark",

236

"hello spark"

237

))

238

239

// Use custom accumulator

240

val wordCounts = sentences.flatMap(_.split(" ")).map { word =>

241

uniqueWords.add(word) // Collect unique words

242

(word, 1)

243

}.reduceByKey(_ + _)

244

245

// Trigger action

246

val counts = wordCounts.collect()

247

248

// Access unique words

249

val unique = uniqueWords.value

250

println(s"Found ${unique.size()} unique words: ${unique}")

251

```

252

253

### Advanced Patterns

254

255

#### Error Tracking with Multiple Accumulators

256

```scala

257

val malformedRecords = sc.longAccumulator("Malformed Records")

258

val validRecords = sc.longAccumulator("Valid Records")

259

val errorDetails = sc.collectionAccumulator[String]("Error Details")

260

261

val processedData = rawData.map { record =>

262

try {

263

val parsed = parseRecord(record)

264

validRecords.add(1)

265

parsed

266

} catch {

267

case e: Exception =>

268

malformedRecords.add(1)

269

errorDetails.add(s"Error parsing '$record': ${e.getMessage}")

270

null

271

}

272

}.filter(_ != null)

273

274

processedData.count()

275

276

println(s"Valid: ${validRecords.value}, Malformed: ${malformedRecords.value}")

277

errorDetails.value.foreach(println)

278

```

279

280

#### Performance Monitoring

281

```scala

282

val processingTime = sc.doubleAccumulator("Processing Time (ms)")

283

val recordsProcessed = sc.longAccumulator("Records Processed")

284

285

val result = data.map { record =>

286

val start = System.currentTimeMillis()

287

val processed = expensiveProcessing(record)

288

val elapsed = System.currentTimeMillis() - start

289

290

processingTime.add(elapsed.toDouble)

291

recordsProcessed.add(1)

292

293

processed

294

}

295

296

result.count()

297

298

println(f"Average processing time: ${processingTime.value / recordsProcessed.value}%.2f ms per record")

299

```

300

301

## Best Practices

302

303

### Broadcast Variables

304

1. **Use for large read-only data**: Ideal for lookup tables, configuration, models

305

2. **Avoid frequent updates**: Broadcast variables are immutable

306

3. **Size considerations**: Should fit comfortably in executor memory

307

4. **Clean up**: Call `unpersist()` when no longer needed

308

5. **Serialization**: Ensure broadcast data is efficiently serializable

309

310

### Accumulators

311

1. **Use in actions only**: Results are only reliable when used in actions, not transformations

312

2. **Idempotent operations**: Should handle task retries gracefully

313

3. **Named accumulators**: Use names for better monitoring in Spark UI

314

4. **Register custom types**: Register custom accumulators for proper tracking

315

5. **Avoid side effects**: Don't rely on accumulator updates for program logic