or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md

shared-variables.mddocs/

0

# Shared Variables

1

2

Broadcast variables and accumulators for efficient data sharing and distributed counting across cluster nodes in Spark applications.

3

4

## Capabilities

5

6

### Broadcast Variables

7

8

Read-only variables cached on each machine rather than shipping with tasks, providing efficient sharing of large datasets across all nodes.

9

10

```scala { .api }

11

/**

12

* A broadcast variable created with SparkContext.broadcast()

13

*/

14

abstract class Broadcast[T](val id: Long) extends Serializable {

15

/** Get the broadcasted value */

16

def value: T

17

18

/** Asynchronously delete cached copies of this broadcast on the executors */

19

def unpersist(): Unit

20

21

/** Asynchronously delete cached copies of this broadcast on the executors */

22

def unpersist(blocking: Boolean): Unit

23

24

/** Destroy all data and metadata related to this broadcast variable */

25

def destroy(): Unit

26

27

/** Whether this broadcast is valid */

28

def isValid: Boolean

29

30

override def toString: String = "Broadcast(" + id + ")"

31

}

32

33

// SparkContext methods for creating broadcast variables

34

def broadcast[T: ClassTag](value: T): Broadcast[T]

35

```

36

37

**Usage Examples:**

38

39

```scala

40

import org.apache.spark.{SparkContext, SparkConf}

41

42

val sc = new SparkContext(new SparkConf().setAppName("Broadcast Example"))

43

44

// Create a large lookup table

45

val lookupTable = Map(

46

"US" -> "United States",

47

"UK" -> "United Kingdom",

48

"DE" -> "Germany",

49

"FR" -> "France"

50

// ... thousands more entries

51

)

52

53

// Broadcast the lookup table to all nodes

54

val broadcastLookup = sc.broadcast(lookupTable)

55

56

// Use broadcast variable in transformations

57

val countryData = sc.textFile("hdfs://country_codes.txt")

58

val countryNames = countryData.map { code =>

59

val lookup = broadcastLookup.value // Access broadcast value

60

lookup.getOrElse(code, "Unknown")

61

}

62

63

// Clean up when done

64

broadcastLookup.unpersist()

65

// broadcastLookup.destroy() // Only if completely done with variable

66

67

// Example with configuration

68

val config = Map(

69

"apiUrl" -> "https://api.example.com",

70

"timeout" -> "30000",

71

"retries" -> "3"

72

)

73

val broadcastConfig = sc.broadcast(config)

74

75

val processedData = inputRDD.mapPartitions { partition =>

76

val conf = broadcastConfig.value

77

// Use configuration for processing each partition

78

partition.map(processRecord(_, conf))

79

}

80

```

81

82

### Accumulators

83

84

Shared variables that can be "added" to through associative operations, providing efficient distributed counters and collectors.

85

86

```scala { .api }

87

/**

88

* A shared variable that can be accumulated (i.e., has an associative and commutative "add" operation)

89

*/

90

class Accumulator[T](initialValue: T, param: AccumulatorParam[T], name: Option[String] = None) extends Serializable {

91

/** Get the current value of this accumulator from within a task */

92

def value: T

93

94

/** Set the accumulator's value; only the driver can call this */

95

def setValue(newValue: T): Unit

96

97

/** Add a value to this accumulator */

98

def add(term: T): Unit

99

100

/** The += operator; can be used to add to the accumulator */

101

def +=(term: T): Unit = add(term)

102

103

/** Merge two accumulators together */

104

def ++(other: Accumulator[T]): Accumulator[T]

105

106

/** Access the accumulator's current value; only the driver should call this */

107

def localValue: T = value

108

109

override def toString: String = if (name.isDefined) name.get else localValue.toString

110

}

111

112

/**

113

* A more general version of Accumulator where the result type differs from the element type

114

*/

115

class Accumulable[R, T](initialValue: R, param: AccumulableParam[R, T], name: Option[String] = None) extends Serializable {

116

/** Get the current value */

117

def value: R

118

119

/** Set the value; only the driver can call this */

120

def setValue(newValue: R): Unit

121

122

/** Add a term to this accumulable */

123

def add(term: T): Unit

124

125

/** The += operator */

126

def +=(term: T): Unit = add(term)

127

128

/** Add to the accumulator (alternative to +=) */

129

def ++=(term: T): Unit = add(term)

130

131

/** Merge with another Accumulable */

132

def ++(other: Accumulable[R, T]): Accumulable[R, T]

133

134

/** Access the current value; only the driver should call this */

135

def localValue: R = value

136

137

override def toString: String = if (name.isDefined) name.get else localValue.toString

138

}

139

140

// SparkContext methods for creating accumulators

141

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

142

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]

143

def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]

144

def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]

145

```

146

147

**Usage Examples:**

148

149

```scala

150

import org.apache.spark.{SparkContext, SparkConf}

151

152

val sc = new SparkContext(new SparkConf().setAppName("Accumulator Example"))

153

154

// Basic numeric accumulator

155

val errorCount = sc.accumulator(0, "Error Count")

156

val sumAccum = sc.accumulator(0.0, "Sum Accumulator")

157

158

val data = sc.parallelize(1 to 1000)

159

160

// Use accumulators in transformations

161

val processedData = data.map { value =>

162

try {

163

if (value % 100 == 0) {

164

throw new RuntimeException(s"Error processing $value")

165

}

166

sumAccum += value.toDouble

167

value * 2

168

} catch {

169

case e: Exception =>

170

errorCount += 1

171

-1 // Error marker

172

}

173

}

174

175

// Trigger action to execute transformations

176

val results = processedData.filter(_ != -1).collect()

177

178

// Read accumulator values (only on driver)

179

println(s"Processed ${results.length} items")

180

println(s"Encountered ${errorCount.value} errors")

181

println(s"Sum of successful values: ${sumAccum.value}")

182

183

// Collection accumulator example

184

val uniqueWords = sc.accumulable(Set.empty[String])

185

186

val text = sc.textFile("hdfs://input.txt")

187

text.flatMap(_.split(" ")).foreach { word =>

188

uniqueWords += word

189

}

190

191

println(s"Unique words found: ${uniqueWords.value.size}")

192

```

193

194

### AccumulatorParam and AccumulableParam

195

196

Interfaces defining how accumulator values are combined.

197

198

```scala { .api }

199

/**

200

* A trait that defines how to accumulate values of type T

201

*/

202

trait AccumulatorParam[T] extends Serializable {

203

/** Add two values together and return a new value */

204

def addInPlace(r1: T, r2: T): T

205

206

/** Return the "zero" value for this type */

207

def zero(initialValue: T): T

208

}

209

210

/**

211

* A trait that defines how to accumulate values of type T into type R

212

*/

213

trait AccumulableParam[R, T] extends Serializable {

214

/** Add a T value to an R accumulator and return a new R */

215

def addAccumulator(r: R, t: T): R

216

217

/** Add two R values together and return a new R */

218

def addInPlace(r1: R, r2: R): R

219

220

/** Return the "zero" value for type R */

221

def zero(initialValue: R): R

222

}

223

224

// Built-in accumulator parameters

225

object AccumulatorParam {

226

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {

227

def addInPlace(t1: Int, t2: Int): Int = t1 + t2

228

def zero(initialValue: Int): Int = 0

229

}

230

231

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {

232

def addInPlace(t1: Long, t2: Long): Long = t1 + t2

233

def zero(initialValue: Long): Long = 0L

234

}

235

236

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {

237

def addInPlace(t1: Double, t2: Double): Double = t1 + t2

238

def zero(initialValue: Double): Double = 0.0

239

}

240

241

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {

242

def addInPlace(t1: Float, t2: Float): Float = t1 + t2

243

def zero(initialValue: Float): Float = 0.0f

244

}

245

}

246

```

247

248

**Custom Accumulator Examples:**

249

250

```scala

251

// Custom accumulator for collecting statistics

252

case class Stats(count: Long, sum: Double, min: Double, max: Double)

253

254

implicit object StatsAccumulatorParam extends AccumulatorParam[Stats] {

255

def zero(initialValue: Stats): Stats = Stats(0, 0.0, Double.MaxValue, Double.MinValue)

256

257

def addInPlace(s1: Stats, s2: Stats): Stats = {

258

if (s1.count == 0) s2

259

else if (s2.count == 0) s1

260

else Stats(

261

s1.count + s2.count,

262

s1.sum + s2.sum,

263

math.min(s1.min, s2.min),

264

math.max(s1.max, s2.max)

265

)

266

}

267

}

268

269

// Custom accumulable for collecting unique items

270

implicit object SetAccumulableParam extends AccumulableParam[Set[String], String] {

271

def zero(initialValue: Set[String]): Set[String] = Set.empty

272

def addAccumulator(set: Set[String], item: String): Set[String] = set + item

273

def addInPlace(set1: Set[String], set2: Set[String]): Set[String] = set1 ++ set2

274

}

275

276

// Usage

277

val stats = sc.accumulator(Stats(0, 0.0, Double.MaxValue, Double.MinValue))

278

val uniqueItems = sc.accumulable(Set.empty[String])

279

280

val data = sc.parallelize(Array(1.5, 2.7, 3.1, 4.9, 2.7))

281

data.foreach { value =>

282

stats += Stats(1, value, value, value)

283

uniqueItems += value.toString

284

}

285

286

val finalStats = stats.value

287

println(s"Count: ${finalStats.count}, Avg: ${finalStats.sum / finalStats.count}")

288

println(s"Unique values: ${uniqueItems.value}")

289

```

290

291

## Advanced Patterns

292

293

### Combining Broadcast Variables and Accumulators

294

295

```scala

296

// Configuration and monitoring pattern

297

val config = Map("threshold" -> 100, "maxRetries" -> 3)

298

val broadcastConfig = sc.broadcast(config)

299

300

val successCount = sc.accumulator(0, "Successful Operations")

301

val failureCount = sc.accumulator(0, "Failed Operations")

302

val retryCount = sc.accumulator(0, "Retry Count")

303

304

val results = inputData.mapPartitions { partition =>

305

val conf = broadcastConfig.value

306

val threshold = conf("threshold")

307

val maxRetries = conf("maxRetries")

308

309

partition.map { record =>

310

var attempts = 0

311

var success = false

312

var result: Option[String] = None

313

314

while (attempts < maxRetries && !success) {

315

try {

316

if (record.value > threshold) {

317

result = Some(processRecord(record))

318

success = true

319

successCount += 1

320

} else {

321

failureCount += 1

322

success = true // Don't retry for threshold failures

323

}

324

} catch {

325

case _: Exception =>

326

attempts += 1

327

retryCount += 1

328

if (attempts >= maxRetries) {

329

failureCount += 1

330

}

331

}

332

}

333

334

result

335

}.filter(_.isDefined).map(_.get)

336

}

337

338

// Trigger execution and report metrics

339

val finalResults = results.collect()

340

println(s"Processed: ${finalResults.length}")

341

println(s"Successful: ${successCount.value}")

342

println(s"Failed: ${failureCount.value}")

343

println(s"Retries: ${retryCount.value}")

344

```

345

346

### Performance Monitoring with Accumulators

347

348

```scala

349

// Performance monitoring accumulators

350

val processingTimeAccum = sc.accumulator(0L, "Total Processing Time")

351

val recordsProcessedAccum = sc.accumulator(0L, "Records Processed")

352

val partitionStatsAccum = sc.accumulable(Map.empty[Int, (Long, Long)])

353

354

val monitoredData = inputRDD.mapPartitionsWithIndex { (partitionId, partition) =>

355

val startTime = System.currentTimeMillis()

356

var recordCount = 0L

357

358

val processedPartition = partition.map { record =>

359

recordCount += 1

360

recordsProcessedAccum += 1

361

// Process record

362

processRecord(record)

363

}.toList

364

365

val endTime = System.currentTimeMillis()

366

val processingTime = endTime - startTime

367

processingTimeAccum += processingTime

368

partitionStatsAccum += Map(partitionId -> (recordCount, processingTime))

369

370

processedPartition.iterator

371

}

372

373

// Trigger execution

374

val results = monitoredData.collect()

375

376

// Analyze performance

377

val totalTime = processingTimeAccum.value

378

val totalRecords = recordsProcessedAccum.value

379

val avgTimePerRecord = totalTime.toDouble / totalRecords

380

381

println(s"Total processing time: ${totalTime}ms")

382

println(s"Average time per record: ${avgTimePerRecord}ms")

383

println(s"Partition stats: ${partitionStatsAccum.value}")

384

```

385

386

### Error Collection with Accumulators

387

388

```scala

389

case class ProcessingError(partitionId: Int, recordId: String, error: String, timestamp: Long)

390

391

// Custom accumulator for collecting errors

392

implicit object ErrorAccumulableParam extends AccumulableParam[List[ProcessingError], ProcessingError] {

393

def zero(initialValue: List[ProcessingError]): List[ProcessingError] = List.empty

394

def addAccumulator(list: List[ProcessingError], error: ProcessingError): List[ProcessingError] = error :: list

395

def addInPlace(list1: List[ProcessingError], list2: List[ProcessingError]): List[ProcessingError] = list1 ++ list2

396

}

397

398

val errorCollector = sc.accumulable(List.empty[ProcessingError])

399

400

val processedData = inputRDD.mapPartitionsWithIndex { (partitionId, partition) =>

401

partition.map { record =>

402

try {

403

processRecord(record)

404

} catch {

405

case e: Exception =>

406

val error = ProcessingError(

407

partitionId = partitionId,

408

recordId = record.id,

409

error = e.getMessage,

410

timestamp = System.currentTimeMillis()

411

)

412

errorCollector += error

413

None

414

}

415

}.filter(_.isDefined).map(_.get)

416

}

417

418

// Execute and collect errors

419

val results = processedData.collect()

420

val errors = errorCollector.value

421

422

println(s"Successfully processed: ${results.length}")

423

println(s"Errors encountered: ${errors.length}")

424

errors.foreach(error => println(s"Error in partition ${error.partitionId}: ${error.error}"))

425

```

426

427

## Best Practices

428

429

### Broadcast Variable Optimization

430

431

```scala

432

// 1. Broadcast large read-only data structures

433

val largeLookupTable = loadLookupTable() // Assume this is large

434

val broadcastTable = sc.broadcast(largeLookupTable)

435

436

// 2. Reuse broadcast variables across multiple operations

437

val enrichedData1 = data1.map(enrichWithLookup(_, broadcastTable.value))

438

val enrichedData2 = data2.map(enrichWithLookup(_, broadcastTable.value))

439

440

// 3. Clean up when done

441

broadcastTable.unpersist() // Remove from memory

442

broadcastTable.destroy() // Complete cleanup

443

```

444

445

### Accumulator Best Practices

446

447

```scala

448

// 1. Only update accumulators inside actions or transformations

449

val counter = sc.accumulator(0)

450

451

// CORRECT: Update in transformation that leads to action

452

val results = data.map { value =>

453

if (someCondition(value)) counter += 1

454

processValue(value)

455

}.collect() // Action triggers execution

456

457

// 2. Be aware of lazy evaluation

458

val lazyRDD = data.map { value =>

459

counter += 1 // This will be called multiple times if RDD is reused

460

value * 2

461

}

462

463

lazyRDD.cache() // Cache to avoid recomputation

464

val result1 = lazyRDD.count()

465

val result2 = lazyRDD.sum() // Counter won't be incremented again

466

467

// 3. Use meaningful names for debugging

468

val errorCounter = sc.accumulator(0, "Processing Errors")

469

val warningCounter = sc.accumulator(0, "Processing Warnings")

470

```

471

472

### Memory Management

473

474

```scala

475

// Monitor memory usage with accumulators

476

val memoryUsageAccum = sc.accumulator(0L, "Memory Usage")

477

478

val processedData = largeDataRDD.mapPartitions { partition =>

479

val runtime = Runtime.getRuntime

480

val initialMemory = runtime.totalMemory() - runtime.freeMemory()

481

482

val results = partition.map(processLargeRecord).toList

483

484

val finalMemory = runtime.totalMemory() - runtime.freeMemory()

485

memoryUsageAccum += (finalMemory - initialMemory)

486

487

results.iterator

488

}

489

490

processedData.count() // Trigger execution

491

println(s"Total memory used: ${memoryUsageAccum.value / (1024 * 1024)} MB")

492

```