or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcasting-accumulators.mdindex.mdjava-api.mdrdd-operations.mdspark-context.mdstorage-persistence.md

broadcasting-accumulators.mddocs/

0

# Broadcasting and Accumulators

1

2

Shared variables in Spark provide efficient mechanisms for distributing read-only data (broadcast variables) and aggregating values across distributed computations (accumulators) without expensive network operations.

3

4

## Broadcast Variables

5

6

Broadcast variables allow keeping a read-only variable cached on each machine rather than shipping a copy with every task.

7

8

### Broadcast Interface

9

10

```scala { .api }

11

abstract class Broadcast[T] {

12

def id: Long

13

def value: T

14

def unpersist(): Unit

15

def unpersist(blocking: Boolean): Unit

16

def destroy(): Unit

17

def destroy(blocking: Boolean): Unit

18

def toString: String

19

}

20

```

21

22

### Creating Broadcast Variables

23

24

```scala { .api }

25

class SparkContext {

26

def broadcast[T: ClassTag](value: T): Broadcast[T]

27

}

28

29

class JavaSparkContext {

30

def broadcast[T](value: T): Broadcast[T]

31

}

32

```

33

34

## Accumulators

35

36

Accumulators provide a mechanism for aggregating information across tasks in a fault-tolerant way.

37

38

### AccumulatorV2 Base Interface

39

40

```scala { .api }

41

abstract class AccumulatorV2[IN, OUT] {

42

def isZero: Boolean

43

def copy(): AccumulatorV2[IN, OUT]

44

def reset(): Unit

45

def add(v: IN): Unit

46

def merge(other: AccumulatorV2[IN, OUT]): Unit

47

def value: OUT

48

}

49

```

50

51

### Built-in Accumulator Types

52

53

#### LongAccumulator

54

55

```scala { .api }

56

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {

57

def add(v: Long): Unit

58

def add(v: java.lang.Long): Unit

59

def count: Long

60

def sum: Long

61

def avg: Double

62

def value: java.lang.Long

63

def isZero: Boolean

64

}

65

```

66

67

#### DoubleAccumulator

68

69

```scala { .api }

70

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {

71

def add(v: Double): Unit

72

def add(v: java.lang.Double): Unit

73

def count: Long

74

def sum: Double

75

def avg: Double

76

def value: java.lang.Double

77

def isZero: Boolean

78

}

79

```

80

81

#### CollectionAccumulator

82

83

```scala { .api }

84

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

85

def add(v: T): Unit

86

def merge(other: CollectionAccumulator[T]): Unit

87

def value: java.util.List[T]

88

def isZero: Boolean

89

}

90

```

91

92

### Creating Accumulators

93

94

#### Scala API

95

96

```scala { .api }

97

class SparkContext {

98

def longAccumulator: LongAccumulator

99

def longAccumulator(name: String): LongAccumulator

100

def doubleAccumulator: DoubleAccumulator

101

def doubleAccumulator(name: String): DoubleAccumulator

102

def collectionAccumulator[T]: CollectionAccumulator[T]

103

def register(acc: AccumulatorV2[_, _]): Unit

104

def register(acc: AccumulatorV2[_, _], name: String): Unit

105

}

106

```

107

108

#### Java API

109

110

```java { .api }

111

public class JavaSparkContext {

112

public LongAccumulator longAccumulator()

113

public LongAccumulator longAccumulator(String name)

114

public DoubleAccumulator doubleAccumulator()

115

public DoubleAccumulator doubleAccumulator(String name)

116

public <T> CollectionAccumulator<T> collectionAccumulator()

117

}

118

```

119

120

## Usage Examples

121

122

### Basic Broadcast Variable Usage

123

124

```scala

125

import org.apache.spark.{SparkContext, SparkConf}

126

127

val conf = new SparkConf().setAppName("Broadcast Example")

128

val sc = new SparkContext(conf)

129

130

// Create a lookup table

131

val lookupTable = Map(

132

"apple" -> 0.5,

133

"banana" -> 0.3,

134

"cherry" -> 1.0,

135

"date" -> 0.8

136

)

137

138

// Broadcast the lookup table

139

val broadcastTable = sc.broadcast(lookupTable)

140

141

// Use in transformations

142

val products = sc.parallelize(Seq("apple", "banana", "cherry", "unknown"))

143

val prices = products.map { product =>

144

val table = broadcastTable.value // Access broadcast value

145

val price = table.getOrElse(product, 0.0)

146

(product, price)

147

}

148

149

val result = prices.collect()

150

// Result: Array((apple,0.5), (banana,0.3), (cherry,1.0), (unknown,0.0))

151

152

// Clean up broadcast variable

153

broadcastTable.unpersist()

154

```

155

156

### Accumulator Examples

157

158

#### Simple Counter

159

160

```scala

161

val sc = new SparkContext(conf)

162

163

// Create accumulator

164

val counter = sc.longAccumulator("My Counter")

165

166

val data = sc.parallelize(1 to 100)

167

val processed = data.map { x =>

168

if (x % 10 == 0) {

169

counter.add(1) // Count multiples of 10

170

}

171

x * 2

172

}

173

174

// Trigger action to execute transformations

175

processed.collect()

176

177

println(s"Multiples of 10 found: ${counter.value}")

178

// Output: Multiples of 10 found: 10

179

```

180

181

#### Error Counting

182

183

```scala

184

val errorCounter = sc.longAccumulator("Errors")

185

val successCounter = sc.longAccumulator("Successes")

186

187

val logs = sc.textFile("application.log")

188

val processed = logs.map { line =>

189

try {

190

val result = processLogLine(line)

191

successCounter.add(1)

192

result

193

} catch {

194

case _: Exception =>

195

errorCounter.add(1)

196

"ERROR"

197

}

198

}

199

200

processed.collect()

201

202

println(s"Successful: ${successCounter.value}, Errors: ${errorCounter.value}")

203

```

204

205

#### Collection Accumulator

206

207

```scala

208

val errorMessages = sc.collectionAccumulator[String]("Error Messages")

209

210

val data = sc.parallelize(1 to 100)

211

val results = data.map { x =>

212

if (x % 13 == 0) {

213

errorMessages.add(s"Unlucky number: $x")

214

}

215

x

216

}

217

218

results.collect()

219

220

// Get all collected error messages

221

val errors = errorMessages.value

222

println(s"Collected ${errors.size()} error messages:")

223

errors.forEach(println)

224

```

225

226

### Advanced Broadcast Usage

227

228

#### Large Lookup Tables

229

230

```scala

231

// Simulate a large lookup table (could be loaded from database)

232

val largeLookup = (1 to 100000).map(i => (s"key$i", s"value$i")).toMap

233

val broadcastLookup = sc.broadcast(largeLookup)

234

235

// Process large dataset

236

val keys = sc.parallelize(1 to 1000000).map(i => s"key${i % 100000}")

237

val enriched = keys.mapPartitions { partition =>

238

val lookup = broadcastLookup.value // Access once per partition

239

partition.map(key => (key, lookup.getOrElse(key, "unknown")))

240

}

241

242

val sample = enriched.take(10)

243

```

244

245

#### Machine Learning Features

246

247

```scala

248

// Broadcast feature weights for prediction

249

case class ModelWeights(weights: Array[Double], intercept: Double)

250

251

val trainedWeights = ModelWeights(

252

weights = Array(0.5, -0.3, 0.8, 1.2),

253

intercept = 0.1

254

)

255

256

val broadcastWeights = sc.broadcast(trainedWeights)

257

258

// Apply model to features

259

val features = sc.parallelize(Seq(

260

Array(1.0, 2.0, 3.0, 4.0),

261

Array(0.5, 1.5, 2.5, 3.5),

262

Array(2.0, 1.0, 4.0, 2.0)

263

))

264

265

val predictions = features.map { featureVector =>

266

val model = broadcastWeights.value

267

val score = featureVector.zip(model.weights)

268

.map { case (feature, weight) => feature * weight }

269

.sum + model.intercept

270

271

if (score > 0) 1.0 else 0.0

272

}

273

274

predictions.collect()

275

```

276

277

### Custom Accumulator

278

279

```scala

280

import org.apache.spark.util.AccumulatorV2

281

import scala.collection.mutable

282

283

// Custom accumulator for collecting unique values

284

class SetAccumulator[T] extends AccumulatorV2[T, mutable.Set[T]] {

285

private val _set = mutable.Set.empty[T]

286

287

def isZero: Boolean = _set.isEmpty

288

289

def copy(): SetAccumulator[T] = {

290

val newAcc = new SetAccumulator[T]

291

newAcc._set ++= this._set

292

newAcc

293

}

294

295

def reset(): Unit = _set.clear()

296

297

def add(v: T): Unit = _set += v

298

299

def merge(other: AccumulatorV2[T, mutable.Set[T]]): Unit = other match {

300

case o: SetAccumulator[T] => _set ++= o._set

301

case _ => throw new UnsupportedOperationException("Cannot merge different accumulator types")

302

}

303

304

def value: mutable.Set[T] = _set.clone()

305

}

306

307

// Usage

308

val uniqueWords = new SetAccumulator[String]

309

sc.register(uniqueWords, "Unique Words")

310

311

val text = sc.textFile("document.txt")

312

val words = text.flatMap(_.split("\\s+")).map(_.toLowerCase)

313

314

words.foreach(word => uniqueWords.add(word))

315

316

println(s"Found ${uniqueWords.value.size} unique words")

317

uniqueWords.value.take(10).foreach(println)

318

```

319

320

### Java API Examples

321

322

#### Java Broadcast Variables

323

324

```java

325

import org.apache.spark.SparkConf;

326

import org.apache.spark.api.java.JavaSparkContext;

327

import org.apache.spark.api.java.JavaRDD;

328

import org.apache.spark.broadcast.Broadcast;

329

import java.util.*;

330

331

public class JavaBroadcastExample {

332

public static void main(String[] args) {

333

SparkConf conf = new SparkConf().setAppName("Java Broadcast");

334

JavaSparkContext sc = new JavaSparkContext(conf);

335

336

// Create lookup map

337

Map<String, Double> priceMap = new HashMap<>();

338

priceMap.put("apple", 0.5);

339

priceMap.put("banana", 0.3);

340

priceMap.put("cherry", 1.0);

341

342

// Broadcast the map

343

Broadcast<Map<String, Double>> broadcastPrices = sc.broadcast(priceMap);

344

345

// Use in transformation

346

List<String> products = Arrays.asList("apple", "banana", "unknown");

347

JavaRDD<String> productRDD = sc.parallelize(products);

348

349

JavaRDD<String> pricesRDD = productRDD.map(product -> {

350

Map<String, Double> prices = broadcastPrices.value();

351

Double price = prices.getOrDefault(product, 0.0);

352

return product + ": $" + price;

353

});

354

355

List<String> results = pricesRDD.collect();

356

results.forEach(System.out::println);

357

358

sc.close();

359

}

360

}

361

```

362

363

#### Java Accumulators

364

365

```java

366

import org.apache.spark.util.LongAccumulator;

367

import org.apache.spark.util.CollectionAccumulator;

368

369

// Long accumulator

370

LongAccumulator errorCount = sc.longAccumulator("Error Count");

371

LongAccumulator processedCount = sc.longAccumulator("Processed Count");

372

373

JavaRDD<String> logs = sc.textFile("logs.txt");

374

JavaRDD<String> processed = logs.map(line -> {

375

try {

376

// Simulate processing

377

if (line.contains("ERROR")) {

378

errorCount.add(1);

379

return "ERROR: " + line;

380

} else {

381

processedCount.add(1);

382

return "OK: " + line;

383

}

384

} catch (Exception e) {

385

errorCount.add(1);

386

return "EXCEPTION: " + line;

387

}

388

});

389

390

processed.collect(); // Trigger computation

391

392

System.out.println("Errors: " + errorCount.value());

393

System.out.println("Processed: " + processedCount.value());

394

395

// Collection accumulator

396

CollectionAccumulator<String> errorMessages = sc.collectionAccumulator();

397

398

JavaRDD<Integer> numbers = sc.parallelize(Arrays.asList(1, 2, 3, 0, 5));

399

numbers.foreach(num -> {

400

if (num == 0) {

401

errorMessages.add("Found zero at position");

402

}

403

});

404

405

List<String> errors = errorMessages.value();

406

errors.forEach(System.out::println);

407

```

408

409

## Performance Considerations

410

411

### Broadcast Variables

412

413

#### When to Use Broadcast Variables

414

415

- **Large lookup tables** that fit in memory

416

- **Configuration data** needed across all tasks

417

- **Model parameters** for machine learning

418

- **Reference data** that doesn't change during job execution

419

420

#### Best Practices

421

422

- **Size limitations**: Keep broadcast variables under a few hundred MB

423

- **Serialization**: Use efficient serialization formats (Kryo)

424

- **Memory management**: Unpersist when no longer needed

425

- **Network efficiency**: Broadcast once, use many times

426

427

```scala

428

// Good: Broadcast large lookup table once

429

val largeLookup = loadLargeLookupTable() // 100MB lookup table

430

val broadcastLookup = sc.broadcast(largeLookup)

431

432

val results = bigDataset.mapPartitions { partition =>

433

val lookup = broadcastLookup.value // Access once per partition

434

partition.map(record => enrichWithLookup(record, lookup))

435

}

436

437

// Bad: Closure captures large variable, shipped with every task

438

val largeLookup = loadLargeLookupTable() // This gets serialized with every task!

439

val results = bigDataset.map(record => enrichWithLookup(record, largeLookup))

440

```

441

442

### Accumulators

443

444

#### When to Use Accumulators

445

446

- **Counting events** across distributed computation

447

- **Collecting diagnostics** and metrics

448

- **Debugging** distributed applications

449

- **Simple aggregations** that don't require grouping

450

451

#### Important Limitations

452

453

- **Actions only**: Accumulators are only updated when inside actions

454

- **Fault tolerance**: Updates may be applied multiple times if tasks are retried

455

- **Side effects**: Don't rely on accumulators for program logic

456

457

```scala

458

// Good: Use accumulator for counting

459

val errorCounter = sc.longAccumulator("Errors")

460

val data = sc.parallelize(records)

461

val cleaned = data.map { record =>

462

try {

463

cleanRecord(record)

464

} catch {

465

case _: Exception =>

466

errorCounter.add(1) // Safe to count errors

467

defaultRecord

468

}

469

}

470

cleaned.count() // Accumulator updated during action

471

472

// Bad: Rely on accumulator for logic

473

val itemCounter = sc.longAccumulator("Items")

474

val data = sc.parallelize(items)

475

val processed = data.map { item =>

476

itemCounter.add(1)

477

if (itemCounter.value > 1000) { // DON'T DO THIS - unreliable!

478

processSpecially(item)

479

} else {

480

processNormally(item)

481

}

482

}

483

```

484

485

## Configuration

486

487

### Broadcast Variables

488

489

- `spark.broadcast.blockSize` - Size of each block for broadcast variables (default: 4m)

490

- `spark.broadcast.compress` - Compress broadcast variables (default: true)

491

- `spark.serializer` - Serializer for broadcast variables (KryoSerializer recommended)

492

493

### Memory Management

494

495

- `spark.memory.fraction` - Fraction of heap for execution and storage

496

- `spark.memory.storageFraction` - Fraction of storage memory for caching

497

498

## Error Handling and Debugging

499

500

### Broadcast Variable Issues

501

502

```scala

503

// Handle missing broadcast data gracefully

504

val maybeBroadcast: Option[Broadcast[Map[String, String]]] =

505

if (lookupData.nonEmpty) Some(sc.broadcast(lookupData)) else None

506

507

val processed = data.map { record =>

508

maybeBroadcast match {

509

case Some(broadcast) => enrichRecord(record, broadcast.value)

510

case None => record // No enrichment if no broadcast data

511

}

512

}

513

```

514

515

### Accumulator Debugging

516

517

```scala

518

// Use accumulators to track different types of records

519

val validRecords = sc.longAccumulator("Valid Records")

520

val invalidRecords = sc.longAccumulator("Invalid Records")

521

val emptyRecords = sc.longAccumulator("Empty Records")

522

523

val processed = data.map { record =>

524

record match {

525

case r if r.isEmpty =>

526

emptyRecords.add(1)

527

None

528

case r if isValid(r) =>

529

validRecords.add(1)

530

Some(processRecord(r))

531

case _ =>

532

invalidRecords.add(1)

533

None

534

}

535

}.filter(_.isDefined).map(_.get)

536

537

processed.count() // Trigger computation

538

539

// Check processing results

540

println(s"Valid: ${validRecords.value}")

541

println(s"Invalid: ${invalidRecords.value}")

542

println(s"Empty: ${emptyRecords.value}")

543

```

544

545

## Important Notes

546

547

### Broadcast Variables

548

549

- **Immutable**: Broadcast variables cannot be modified after creation

550

- **Lazy**: Variables are sent to executors when first used

551

- **Cached**: Each executor caches the broadcast variable locally

552

- **Cleanup**: Call `unpersist()` or `destroy()` to free memory

553

- **Serialization**: Variables must be serializable

554

555

### Accumulators

556

557

- **Write-only**: Tasks can only add to accumulators, not read their values

558

- **Driver access**: Only the driver can read accumulator values

559

- **Fault tolerance**: Values may be updated multiple times due to task retries

560

- **Actions only**: Reliable updates only occur within actions, not transformations

561

- **Registration**: Named accumulators appear in Spark UI for monitoring

562

563

### General Guidelines

564

565

- **Use broadcast for large read-only data** shared across tasks

566

- **Use accumulators for simple counting and collecting**

567

- **Monitor memory usage** for broadcast variables

568

- **Don't use accumulators for critical program logic**

569

- **Clean up resources** when no longer needed

570

- **Test with different cluster sizes** to ensure proper behavior