or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-management.mdindex.mdjava-api.mdpair-rdd-operations.mdrdd-operations.mdstorage-persistence.md

broadcast-accumulators.mddocs/

0

# Broadcast Variables and Accumulators

1

2

Distributed variable support for efficiently sharing read-only data across tasks (broadcast variables) and collecting information from executors (accumulators).

3

4

## Capabilities

5

6

### Broadcast Variables

7

8

Broadcast variables allow you to efficiently distribute read-only data to all worker nodes, rather than shipping a copy with each task.

9

10

```scala { .api }

11

/**

12

* A broadcast variable created by SparkContext.broadcast().

13

* Access its value through .value.

14

*

15

* @param id unique identifier for this broadcast variable

16

*/

17

abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {

18

19

/** Get the broadcast value */

20

def value: T

21

22

/** Asynchronously delete cached copies of this broadcast on the executors */

23

def unpersist(): Unit

24

25

/** Delete cached copies of this broadcast on the executors, with option to block */

26

def unpersist(blocking: Boolean): Unit

27

28

/** Destroy all data and metadata related to this broadcast variable */

29

def destroy(): Unit

30

31

/** Whether this broadcast variable is valid (not destroyed) */

32

def isValid: Boolean

33

34

override def toString: String = s"Broadcast($id)"

35

}

36

37

/**

38

* SparkContext methods for creating broadcast variables

39

*/

40

class SparkContext(config: SparkConf) extends Logging {

41

42

/** Broadcast a read-only variable to the cluster, returning a Broadcast object */

43

def broadcast[T: ClassTag](value: T): Broadcast[T]

44

}

45

```

46

47

**Usage Examples:**

48

49

```scala

50

import org.apache.spark.{SparkContext, SparkConf}

51

52

val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example").setMaster("local[*]"))

53

54

// Create lookup table

55

val lookupTable = Map(

56

"error" -> 1,

57

"warning" -> 2,

58

"info" -> 3,

59

"debug" -> 4

60

)

61

62

// Broadcast the lookup table

63

val broadcastLookup = sc.broadcast(lookupTable)

64

65

// Create RDD of log entries

66

val logEntries = sc.parallelize(Array(

67

"error: failed to connect",

68

"warning: retry attempt",

69

"info: processing started",

70

"debug: variable x = 5"

71

))

72

73

// Use broadcast variable in transformations

74

val categorizedLogs = logEntries.map { entry =>

75

val level = entry.split(":").head

76

val category = broadcastLookup.value.getOrElse(level, 0)

77

(entry, category)

78

}

79

80

val results = categorizedLogs.collect()

81

results.foreach(println)

82

83

// Cleanup broadcast variable

84

broadcastLookup.unpersist()

85

// broadcastLookup.destroy() // Use when completely done

86

87

sc.stop()

88

```

89

90

**Advanced Broadcast Usage:**

91

92

```scala

93

// Large lookup table scenario

94

val largeReferenceData = sc.textFile("reference-data.txt")

95

.map(line => {

96

val parts = line.split(",")

97

(parts(0), parts(1))

98

})

99

.collectAsMap() // Collect to driver as Map

100

101

val broadcastReference = sc.broadcast(largeReferenceData)

102

103

// Use in join-like operation (broadcast join)

104

val transactionData = sc.textFile("transactions.txt")

105

val enrichedTransactions = transactionData.map { transaction =>

106

val customerId = transaction.split(",").head

107

val customerInfo = broadcastReference.value.get(customerId)

108

(transaction, customerInfo)

109

}

110

111

// Machine learning model broadcast

112

val trainedModel = trainMLModel(trainingData) // Some ML model

113

val broadcastModel = sc.broadcast(trainedModel)

114

115

val predictions = testData.map { testPoint =>

116

val model = broadcastModel.value

117

val prediction = model.predict(testPoint)

118

(testPoint, prediction)

119

}

120

```

121

122

### Accumulators

123

124

Accumulators provide a simple syntax for aggregating values from worker nodes back to the driver program.

125

126

```scala { .api }

127

/**

128

* A data type that can be accumulated (has an associative and commutative operation)

129

*/

130

trait AccumulatorParam[T] extends Serializable {

131

/** Return a zero value for this accumulator type */

132

def zero(initialValue: T): T

133

134

/** Add two values together */

135

def addInPlace(t1: T, t2: T): T

136

}

137

138

/**

139

* A shared variable that can be accumulated across parallel operations

140

*/

141

class Accumulator[T] private[spark] (

142

@transient private[spark] val initialValue: T,

143

param: AccumulatorParam[T],

144

name: Option[String] = None) extends Serializable {

145

146

/** Get the current value of this accumulator from the driver program */

147

def value: T

148

149

/** Add a value to this accumulator */

150

def add(term: T): Unit

151

152

/** Add a value using += operator */

153

def += (term: T): Unit = add(term)

154

155

/** Get the current value from within a task (may not be the global value) */

156

def localValue: T

157

158

/** Get the zero value */

159

def zero: T

160

161

/** Get the accumulator's unique ID */

162

def id: Long

163

164

/** Get the accumulator's name */

165

def name: Option[String]

166

167

override def toString: String = if (name.isDefined) name.get else value.toString

168

}

169

170

/**

171

* SparkContext methods for creating accumulators

172

*/

173

class SparkContext(config: SparkConf) extends Logging {

174

175

/** Create an accumulator variable of type Int */

176

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

177

178

/** Create a named accumulator variable */

179

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]

180

181

/** Create an accumulator for integers */

182

def accumulator(initialValue: Int): Accumulator[Int]

183

184

/** Create an accumulator for doubles */

185

def accumulator(initialValue: Double): Accumulator[Double]

186

187

/** Create a named integer accumulator */

188

def accumulator(initialValue: Int, name: String): Accumulator[Int]

189

190

/** Create a named double accumulator */

191

def accumulator(initialValue: Double, name: String): Accumulator[Double]

192

}

193

```

194

195

**Usage Examples:**

196

197

```scala

198

val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example").setMaster("local[*]"))

199

200

// Create accumulators

201

val errorCount = sc.accumulator(0, "Error Count")

202

val warningCount = sc.accumulator(0, "Warning Count")

203

val totalProcessed = sc.accumulator(0)

204

205

// Process data with accumulators

206

val logData = sc.textFile("logs.txt")

207

208

val processedLogs = logData.map { line =>

209

totalProcessed += 1

210

211

if (line.contains("ERROR")) {

212

errorCount += 1

213

} else if (line.contains("WARNING")) {

214

warningCount += 1

215

}

216

217

line.toUpperCase

218

}

219

220

// Trigger action to update accumulators

221

processedLogs.count()

222

223

// Read accumulator values (only on driver)

224

println(s"Total processed: ${totalProcessed.value}")

225

println(s"Errors: ${errorCount.value}")

226

println(s"Warnings: ${warningCount.value}")

227

228

sc.stop()

229

```

230

231

**Advanced Accumulator Usage:**

232

233

```scala

234

// Custom accumulator for complex data types

235

case class Statistics(count: Long, sum: Double, sumSquares: Double) {

236

def mean: Double = if (count > 0) sum / count else 0.0

237

def variance: Double = if (count > 0) (sumSquares / count) - (mean * mean) else 0.0

238

}

239

240

implicit object StatisticsAccumulatorParam extends AccumulatorParam[Statistics] {

241

def zero(initialValue: Statistics): Statistics = Statistics(0, 0.0, 0.0)

242

243

def addInPlace(s1: Statistics, s2: Statistics): Statistics = {

244

Statistics(

245

s1.count + s2.count,

246

s1.sum + s2.sum,

247

s1.sumSquares + s2.sumSquares

248

)

249

}

250

}

251

252

// Use custom accumulator

253

val stats = sc.accumulator(Statistics(0, 0.0, 0.0), "Data Statistics")

254

255

val data = sc.parallelize(1.0 to 1000.0)

256

data.foreach { value =>

257

stats += Statistics(1, value, value * value)

258

}

259

260

println(s"Mean: ${stats.value.mean}")

261

println(s"Variance: ${stats.value.variance}")

262

263

// Collection accumulator (Spark 2.0+ style simulation)

264

import scala.collection.mutable

265

266

class ListAccumulator[T] extends AccumulatorParam[mutable.ListBuffer[T]] {

267

def zero(initialValue: mutable.ListBuffer[T]) = mutable.ListBuffer[T]()

268

269

def addInPlace(buf1: mutable.ListBuffer[T], buf2: mutable.ListBuffer[T]) = {

270

buf1 ++= buf2

271

}

272

}

273

274

implicit def listAccumulatorParam[T] = new ListAccumulator[T]

275

276

val errorMessages = sc.accumulator(mutable.ListBuffer[String](), "Error Messages")

277

278

logData.foreach { line =>

279

if (line.contains("FATAL")) {

280

errorMessages += mutable.ListBuffer(line)

281

}

282

}

283

284

println(s"Fatal errors: ${errorMessages.value.toList}")

285

```

286

287

### Built-in AccumulatorParam Implementations

288

289

Spark provides built-in accumulator parameters for common types:

290

291

```scala { .api }

292

// Built-in accumulator parameters

293

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {

294

def zero(initialValue: Int) = 0

295

def addInPlace(t1: Int, t2: Int) = t1 + t2

296

}

297

298

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {

299

def zero(initialValue: Long) = 0L

300

def addInPlace(t1: Long, t2: Long) = t1 + t2

301

}

302

303

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {

304

def zero(initialValue: Double) = 0.0

305

def addInPlace(t1: Double, t2: Double) = t1 + t2

306

}

307

308

// For collections

309

implicit object ListAccumulatorParam extends AccumulatorParam[List[String]] {

310

def zero(initialValue: List[String]) = List()

311

def addInPlace(list1: List[String], list2: List[String]) = list1 ++ list2

312

}

313

```

314

315

## Java API Support

316

317

### Java Broadcast Variables

318

319

```java { .api }

320

/**

321

* Java-friendly broadcast variable access

322

*/

323

public class JavaSparkContext implements Closeable {

324

/** Broadcast a read-only variable to the cluster */

325

public <T> Broadcast<T> broadcast(T value)

326

}

327

328

// Usage in Java

329

import org.apache.spark.broadcast.Broadcast;

330

import java.util.Map;

331

import java.util.HashMap;

332

333

Map<String, Integer> lookupMap = new HashMap<>();

334

lookupMap.put("apple", 1);

335

lookupMap.put("banana", 2);

336

337

Broadcast<Map<String, Integer>> broadcastLookup = sc.broadcast(lookupMap);

338

339

JavaRDD<String> data = sc.parallelize(Arrays.asList("apple", "banana", "cherry"));

340

JavaRDD<Integer> mapped = data.map(item ->

341

broadcastLookup.value().getOrDefault(item, 0)

342

);

343

```

344

345

### Java Accumulators

346

347

```java { .api }

348

/**

349

* Java accumulator creation methods

350

*/

351

public class JavaSparkContext implements Closeable {

352

/** Create an integer accumulator */

353

public Accumulator<Integer> accumulator(Integer initialValue)

354

355

/** Create a named integer accumulator */

356

public Accumulator<Integer> accumulator(Integer initialValue, String name)

357

358

/** Create an accumulator with custom AccumulatorParam */

359

public <T> Accumulator<T> accumulator(T initialValue, AccumulatorParam<T> param)

360

361

/** Create a named accumulator with custom AccumulatorParam */

362

public <T> Accumulator<T> accumulator(T initialValue, String name, AccumulatorParam<T> param)

363

}

364

365

// Usage in Java

366

Accumulator<Integer> errorCount = sc.accumulator(0, "Errors");

367

Accumulator<Integer> lineCount = sc.accumulator(0);

368

369

JavaRDD<String> lines = sc.textFile("data.txt");

370

lines.foreach(line -> {

371

lineCount.add(1);

372

if (line.contains("ERROR")) {

373

errorCount.add(1);

374

}

375

});

376

377

System.out.println("Total lines: " + lineCount.value());

378

System.out.println("Error lines: " + errorCount.value());

379

```

380

381

## Performance and Best Practices

382

383

### Broadcast Variable Guidelines

384

385

**When to Use Broadcast Variables:**

386

- Lookup tables or reference data needed by many tasks

387

- Configuration objects used across transformations

388

- Machine learning models for scoring

389

- Data that would otherwise be serialized with every task

390

391

**Size Considerations:**

392

```scala

393

// Good: Small to medium lookup tables (< 1GB)

394

val smallLookup = Map(/* thousands of entries */)

395

val broadcast = sc.broadcast(smallLookup)

396

397

// Questionable: Very large data (> 1GB)

398

val hugeLookup = Map(/* millions of entries */)

399

val broadcast = sc.broadcast(hugeLookup) // May cause memory issues

400

```

401

402

**Lifecycle Management:**

403

```scala

404

// Create broadcast once, use many times

405

val lookup = sc.broadcast(referenceData)

406

407

// Use in multiple operations

408

val result1 = data1.map(x => lookup.value.get(x.id))

409

val result2 = data2.filter(x => lookup.value.contains(x.key))

410

411

// Cleanup when done

412

lookup.unpersist() // Remove from executors

413

lookup.destroy() // Complete cleanup (only when completely finished)

414

```

415

416

### Accumulator Guidelines

417

418

**Accumulator Guarantees:**

419

- Actions guarantee exactly-once accumulator updates

420

- Transformations may update accumulators multiple times due to retries

421

422

```scala

423

// Safe: Accumulator in action

424

data.foreach(x => counter += 1) // Each element counted exactly once

425

426

// Unsafe: Accumulator in transformation

427

val mapped = data.map { x =>

428

counter += 1 // May be called multiple times due to retries

429

transform(x)

430

}

431

mapped.collect() // Counter may be > data.count()

432

```

433

434

**Recommended Patterns:**

435

```scala

436

// Pattern 1: Use accumulators only in actions

437

data.foreach { x =>

438

if (isError(x)) errorCount += 1

439

process(x)

440

}

441

442

// Pattern 2: Separate transformation and accumulation

443

val processedData = data.map(transform) // No accumulator here

444

processedData.foreach { x =>

445

validCount += 1 // Accumulator in action

446

}

447

448

// Pattern 3: Use cache() if transformation with accumulator is reused

449

val transformed = data.map { x =>

450

operationCount += 1

451

expensiveTransform(x)

452

}.cache() // Cache to avoid recomputation

453

454

transformed.collect() // Accumulators updated once

455

```

456

457

### Memory Management

458

459

**Broadcast Variables:**

460

```scala

461

// Monitor broadcast memory usage

462

val conf = new SparkConf()

463

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // More efficient

464

.set("spark.broadcast.compress", "true") // Compress broadcasts

465

.set("spark.io.compression.codec", "snappy") // Fast compression

466

467

// Cleanup pattern

468

def withBroadcast[T, R](data: T)(operation: Broadcast[T] => R): R = {

469

val broadcast = sc.broadcast(data)

470

try {

471

operation(broadcast)

472

} finally {

473

broadcast.unpersist()

474

}

475

}

476

```

477

478

**Large Broadcast Variables:**

479

```scala

480

// For very large broadcast data, consider alternatives

481

// Option 1: Use external storage (Redis, etc.)

482

// Option 2: Partition the broadcast data

483

// Option 3: Use RDD join instead of broadcast

484

485

// Instead of huge broadcast:

486

val hugeBroadcast = sc.broadcast(hugeMap) // Memory issues

487

488

// Consider RDD join:

489

val lookupRDD = sc.parallelize(hugeMap.toSeq)

490

val joinedRDD = dataRDD.join(lookupRDD) // Distributed join

491

```

492

493

## Common Anti-patterns

494

495

### Broadcast Variable Misuse

496

```scala

497

// Bad: Broadcasting large mutable collections

498

val mutableMap = scala.collection.mutable.Map[String, Int]()

499

val broadcast = sc.broadcast(mutableMap) // Don't broadcast mutable data

500

501

// Bad: Broadcasting data used only once

502

val onceUsed = sc.broadcast(smallData)

503

data.map(x => onceUsed.value.get(x.id)).collect() // Just pass directly

504

505

// Good: Broadcasting immutable reference data used multiple times

506

val immutableLookup = Map[String, Int](/* data */)

507

val broadcast = sc.broadcast(immutableLookup)

508

```

509

510

### Accumulator Misuse

511

```scala

512

// Bad: Using accumulators for side effects in transformations

513

val result = data.map { x =>

514

counter += 1 // Bad: side effect in transformation

515

x * 2

516

}

517

518

// Bad: Reading accumulator value from tasks

519

data.map { x =>

520

val currentCount = counter.value // Bad: undefined behavior

521

x + currentCount

522

}

523

524

// Good: Using accumulators for monitoring in actions

525

data.map(transform).foreach { x =>

526

counter += 1 // Good: side effect in action

527

save(x)

528

}

529

```

530

531

### Resource Leaks

532

```scala

533

// Bad: Not cleaning up broadcast variables

534

def processData(): Unit = {

535

val broadcast = sc.broadcast(referenceData)

536

// ... use broadcast ...

537

// No cleanup - memory leak

538

}

539

540

// Good: Proper cleanup

541

def processData(): Unit = {

542

val broadcast = sc.broadcast(referenceData)

543

try {

544

// ... use broadcast ...

545

} finally {

546

broadcast.unpersist()

547

}

548

}

549

```