or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mddata-io-persistence.mdindex.mdkey-value-operations.mdrdd-operations.mdsparkcontext.md

broadcast-accumulators.mddocs/

0

# Broadcast Variables and Accumulators

1

2

Shared variables in Spark enable efficient data sharing across cluster nodes. Broadcast variables distribute read-only data efficiently, while accumulators collect information from workers back to the driver in a fault-tolerant manner.

3

4

## Capabilities

5

6

### Broadcast Variables

7

8

Efficiently distribute read-only data to all cluster nodes, avoiding repeated serialization and network transfer.

9

10

```scala { .api }

11

abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {

12

def value: T

13

def unpersist(): Unit

14

def unpersist(blocking: Boolean): Unit

15

def destroy(): Unit

16

def toString: String

17

}

18

19

// Creation via SparkContext

20

def broadcast[T: ClassTag](value: T): Broadcast[T]

21

```

22

23

**Usage Examples:**

24

```scala

25

// Large lookup table that all tasks need

26

val lookupTable = Map(

27

"user1" -> "Alice",

28

"user2" -> "Bob",

29

"user3" -> "Charlie"

30

// ... potentially thousands of entries

31

)

32

33

// Broadcast the table once instead of sending with each task

34

val broadcastLookup = sc.broadcast(lookupTable)

35

36

// Use in transformations - only the broadcasted reference is serialized

37

val userIds = sc.parallelize(Seq("user1", "user2", "user3", "user1"))

38

val userNames = userIds.map { id =>

39

broadcastLookup.value.getOrElse(id, "Unknown")

40

}

41

// Result: ["Alice", "Bob", "Charlie", "Alice"]

42

43

// Clean up when done

44

broadcastLookup.unpersist()

45

```

46

47

**Common Use Cases:**

48

```scala

49

// Configuration broadcast

50

val config = Map("apiUrl" -> "https://api.example.com", "timeout" -> "30s")

51

val broadcastConfig = sc.broadcast(config)

52

53

// Model parameters broadcast

54

val modelWeights = Array.fill(1000)(scala.util.Random.nextDouble())

55

val broadcastWeights = sc.broadcast(modelWeights)

56

57

// Large dimension tables

58

val productCatalog = loadProductCatalogFromDatabase() // Large dataset

59

val broadcastCatalog = sc.broadcast(productCatalog)

60

61

val transactions = sc.textFile("transactions.txt")

62

val enrichedTransactions = transactions.map { transaction =>

63

val productId = extractProductId(transaction)

64

val productInfo = broadcastCatalog.value.get(productId)

65

enrichTransaction(transaction, productInfo)

66

}

67

```

68

69

### AccumulatorV2 Base Class

70

71

Base class for all accumulator implementations, providing fault-tolerant aggregation.

72

73

```scala { .api }

74

abstract class AccumulatorV2[IN, OUT] extends Serializable {

75

// Core accumulator interface

76

def isZero: Boolean

77

def copy(): AccumulatorV2[IN, OUT]

78

def reset(): Unit

79

def add(v: IN): Unit

80

def merge(other: AccumulatorV2[IN, OUT]): Unit

81

def value: OUT

82

83

// Metadata

84

def name: Option[String]

85

def id: Long

86

def isRegistered: Boolean

87

def register(sc: SparkContext, name: Option[String] = None, countFailedValues: Boolean = false): Unit

88

}

89

```

90

91

### Built-in Accumulator Types

92

93

Pre-implemented accumulator types for common use cases.

94

95

```scala { .api }

96

// Long accumulator

97

class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {

98

def add(v: Long): Unit

99

def add(v: jl.Long): Unit

100

def count: Long

101

def sum: Long

102

def avg: Double

103

}

104

105

// Double accumulator

106

class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {

107

def add(v: Double): Unit

108

def add(v: jl.Double): Unit

109

def count: Long

110

def sum: Double

111

def avg: Double

112

}

113

114

// Collection accumulator

115

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

116

def add(v: T): Unit

117

def copyAndReset(): CollectionAccumulator[T]

118

}

119

120

// Creation via SparkContext

121

def longAccumulator(): LongAccumulator

122

def longAccumulator(name: String): LongAccumulator

123

def doubleAccumulator(): DoubleAccumulator

124

def doubleAccumulator(name: String): DoubleAccumulator

125

def collectionAccumulator[T](): CollectionAccumulator[T]

126

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

127

```

128

129

**Usage Examples:**

130

```scala

131

// Count errors during processing

132

val errorCount = sc.longAccumulator("Processing Errors")

133

val totalProcessed = sc.longAccumulator("Total Processed")

134

135

val data = sc.parallelize(1 to 1000)

136

val results = data.map { value =>

137

totalProcessed.add(1)

138

try {

139

processValue(value)

140

} catch {

141

case _: Exception =>

142

errorCount.add(1)

143

None

144

}

145

}.filter(_.isDefined).map(_.get)

146

147

// Trigger action to execute

148

results.count()

149

150

// Check accumulator values

151

println(s"Total processed: ${totalProcessed.value}")

152

println(s"Errors: ${errorCount.value}")

153

println(s"Success rate: ${(totalProcessed.value - errorCount.value).toDouble / totalProcessed.value}")

154

155

// Collect sample error messages

156

val errorMessages = sc.collectionAccumulator[String]("Error Messages")

157

data.foreach { value =>

158

try {

159

processValue(value)

160

} catch {

161

case e: Exception => errorMessages.add(e.getMessage)

162

}

163

}

164

```

165

166

### Custom Accumulators

167

168

Create custom accumulators for specialized aggregation needs.

169

170

```scala { .api }

171

// Custom accumulator example structure

172

class CustomAccumulator extends AccumulatorV2[InputType, OutputType] {

173

private var _value: InternalState = initialState

174

175

def isZero: Boolean = _value == initialState

176

def copy(): CustomAccumulator = new CustomAccumulator // with copied state

177

def reset(): Unit = _value = initialState

178

def add(v: InputType): Unit = updateInternalState(v)

179

def merge(other: AccumulatorV2[InputType, OutputType]): Unit = mergeStates(other)

180

def value: OutputType = computeOutputFromState(_value)

181

}

182

```

183

184

**Custom Accumulator Examples:**

185

```scala

186

// Statistics accumulator

187

class StatsAccumulator extends AccumulatorV2[Double, (Long, Double, Double, Double, Double)] {

188

private var count: Long = 0

189

private var sum: Double = 0.0

190

private var sumSquares: Double = 0.0

191

private var min: Double = Double.MaxValue

192

private var max: Double = Double.MinValue

193

194

def isZero: Boolean = count == 0

195

196

def copy(): StatsAccumulator = {

197

val acc = new StatsAccumulator

198

acc.count = this.count

199

acc.sum = this.sum

200

acc.sumSquares = this.sumSquares

201

acc.min = this.min

202

acc.max = this.max

203

acc

204

}

205

206

def reset(): Unit = {

207

count = 0

208

sum = 0.0

209

sumSquares = 0.0

210

min = Double.MaxValue

211

max = Double.MinValue

212

}

213

214

def add(v: Double): Unit = {

215

count += 1

216

sum += v

217

sumSquares += v * v

218

min = math.min(min, v)

219

max = math.max(max, v)

220

}

221

222

def merge(other: AccumulatorV2[Double, (Long, Double, Double, Double, Double)]): Unit = {

223

other match {

224

case o: StatsAccumulator =>

225

if (o.count > 0) {

226

count += o.count

227

sum += o.sum

228

sumSquares += o.sumSquares

229

min = math.min(min, o.min)

230

max = math.max(max, o.max)

231

}

232

}

233

}

234

235

def value: (Long, Double, Double, Double, Double) = {

236

val mean = if (count > 0) sum / count else 0.0

237

val variance = if (count > 1) (sumSquares - sum * sum / count) / (count - 1) else 0.0

238

(count, sum, mean, math.sqrt(variance), min, max)

239

}

240

}

241

242

// Usage

243

val statsAcc = new StatsAccumulator

244

sc.register(statsAcc, "Data Statistics")

245

246

val data = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 100.0))

247

data.foreach(statsAcc.add(_))

248

249

val (count, sum, mean, stddev, min, max) = statsAcc.value

250

println(s"Count: $count, Mean: $mean, StdDev: $stddev, Min: $min, Max: $max")

251

```

252

253

### Histogram Accumulator

254

255

Built-in accumulator for creating histograms.

256

257

```scala { .api }

258

// Available on RDD[Double]

259

def histogram(buckets: Array[Double]): Array[Long]

260

def histogram(bucketNum: Int): (Array[Double], Array[Long])

261

```

262

263

**Usage Example:**

264

```scala

265

val data = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1, 7.9, 8.3, 9.1, 10.0))

266

267

// Histogram with specified bucket boundaries

268

val buckets = Array(0.0, 2.0, 4.0, 6.0, 8.0, 10.0)

269

val counts = data.histogram(buckets)

270

// Result: counts for ranges [0-2), [2-4), [4-6), [6-8), [8-10]

271

272

// Histogram with automatic bucket calculation

273

val (autoBuckets, autoCounts) = data.histogram(5)

274

```

275

276

## Best Practices and Performance

277

278

### Broadcast Variable Guidelines

279

280

```scala

281

// DO: Broadcast large read-only data used across many tasks

282

val broadcastTable = sc.broadcast(largeHashMap)

283

284

// DON'T: Broadcast small data or data used in few tasks

285

val smallConfig = Map("key" -> "value") // Just include directly

286

287

// DO: Unpersist when done to free memory

288

broadcastTable.unpersist()

289

290

// DO: Use broadcast for join optimization with small dimension tables

291

val smallTable = sc.broadcast(dimensionTable.collectAsMap())

292

val enriched = factTable.map { row =>

293

val enrichmentData = smallTable.value.get(row.key)

294

enrichRow(row, enrichmentData)

295

}

296

```

297

298

### Accumulator Guidelines

299

300

```scala

301

// DO: Use accumulators for monitoring and debugging

302

val validRecords = sc.longAccumulator("Valid Records")

303

val invalidRecords = sc.longAccumulator("Invalid Records")

304

305

// DON'T: Use accumulators for functional logic (non-deterministic)

306

// Accumulators should be side-effects only

307

308

// DO: Check accumulator values after actions, not transformations

309

data.map { record =>

310

if (isValid(record)) {

311

validRecords.add(1)

312

processRecord(record)

313

} else {

314

invalidRecords.add(1)

315

None

316

}

317

}.collect() // Action triggers accumulator updates

318

319

println(s"Valid: ${validRecords.value}, Invalid: ${invalidRecords.value}")

320

```

321

322

## Types

323

324

```scala { .api }

325

// Broadcast variable base class

326

abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable

327

328

// Accumulator base class

329

abstract class AccumulatorV2[IN, OUT] extends Serializable

330

331

// Built-in accumulator implementations

332

class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long]

333

class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double]

334

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]

335

336

// Accumulator registration and metadata

337

trait AccumulatorParam[T] {

338

def addInPlace(t1: T, t2: T): T

339

def zero(initialValue: T): T

340

}

341

342

case class AccumulableInfo(

343

id: Long,

344

name: Option[String],

345

update: Option[String],

346

value: String,

347

internal: Boolean,

348

countFailedValues: Boolean,

349

metadata: Option[String]

350

)

351

```