or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

core-engine.mddocs/

0

# Core Engine

1

2

The Spark Core Engine provides the fundamental distributed computing capabilities through Resilient Distributed Datasets (RDDs), transformations, actions, and distributed variables. This is the foundation upon which all other Spark components are built.

3

4

## Package Information

5

6

Core engine functionality is available through:

7

8

```scala

9

import org.apache.spark.{SparkConf, SparkContext}

10

import org.apache.spark.rdd.RDD

11

import org.apache.spark.broadcast.Broadcast

12

import org.apache.spark.util.{LongAccumulator, DoubleAccumulator}

13

```

14

15

## Basic Usage

16

17

```scala

18

import org.apache.spark.{SparkConf, SparkContext}

19

20

// Initialize Spark

21

val conf = new SparkConf()

22

.setAppName("My Spark Application")

23

.setMaster("local[*]") // or cluster URL

24

25

val sc = new SparkContext(conf)

26

27

// Create RDD from collection

28

val numbers = sc.parallelize(1 to 1000000)

29

30

// Transform and compute

31

val result = numbers

32

.filter(_ % 2 == 0)

33

.map(_ * 2)

34

.reduce(_ + _)

35

36

println(s"Result: $result")

37

sc.stop()

38

```

39

40

## Capabilities

41

42

### Spark Context

43

44

The main entry point for Spark functionality. Represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables.

45

46

```scala { .api }

47

class SparkContext(config: SparkConf) {

48

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

49

def parallelize[T: ClassTag](seq: Seq[T]): RDD[T]

50

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

51

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

52

def sequenceFile[K, V](path: String)(implicit km: ClassTag[K], vm: ClassTag[V]): RDD[(K, V)]

53

54

// Distributed variables

55

def broadcast[T](value: T): Broadcast[T]

56

def longAccumulator(): LongAccumulator

57

def longAccumulator(name: String): LongAccumulator

58

def doubleAccumulator(): DoubleAccumulator

59

def doubleAccumulator(name: String): DoubleAccumulator

60

61

// File distribution

62

def addFile(path: String): Unit

63

def addFile(path: String, recursive: Boolean): Unit

64

def addJar(path: String): Unit

65

66

// Job and resource management

67

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

68

def clearJobGroup(): Unit

69

def addJobTag(tag: String): Unit

70

def removeJobTag(tag: String): Unit

71

def getJobTags(): Set[String]

72

def clearJobTags(): Unit

73

def requestExecutors(numExecutors: Int): Boolean

74

def killExecutors(executorIds: Seq[String]): Boolean

75

def getExecutorMemoryStatus: Map[String, (Long, Long)]

76

77

// Control

78

def stop(): Unit

79

def getConf: SparkConf

80

def defaultParallelism: Int

81

def version: String

82

}

83

```

84

85

Usage example:

86

87

```scala

88

val conf = new SparkConf().setAppName("MyApp")

89

val sc = new SparkContext(conf)

90

91

// Create RDD from file

92

val textRDD = sc.textFile("hdfs://path/to/file.txt")

93

94

// Create RDD from collection

95

val numbersRDD = sc.parallelize(Array(1, 2, 3, 4, 5))

96

97

// Create broadcast variable

98

val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

99

100

// Create accumulator

101

val counter = sc.longAccumulator("MyCounter")

102

```

103

104

### Spark Configuration

105

106

Configuration for Spark applications, controlling various aspects of execution.

107

108

```scala { .api }

109

class SparkConf(loadDefaults: Boolean = true) {

110

def set(key: String, value: String): SparkConf

111

def setIfMissing(key: String, value: String): SparkConf

112

def setAppName(name: String): SparkConf

113

def setMaster(master: String): SparkConf

114

def setJars(jars: Seq[String]): SparkConf

115

def setExecutorEnv(variables: Seq[(String, String)]): SparkConf

116

117

def get(key: String): String

118

def get(key: String, defaultValue: String): String

119

def getOption(key: String): Option[String]

120

def getAll: Array[(String, String)]

121

def contains(key: String): Boolean

122

123

def remove(key: String): SparkConf

124

def clone(): SparkConf

125

}

126

```

127

128

Usage example:

129

130

```scala

131

val conf = new SparkConf()

132

.setAppName("My Application")

133

.setMaster("yarn")

134

.set("spark.executor.memory", "2g")

135

.set("spark.executor.cores", "4")

136

.set("spark.sql.adaptive.enabled", "true")

137

```

138

139

### Resilient Distributed Datasets (RDDs)

140

141

The fundamental data structure in Spark. An RDD is an immutable, distributed collection of objects that can be processed in parallel.

142

143

```scala { .api }

144

abstract class RDD[T: ClassTag] extends Serializable {

145

// Transformations (lazy)

146

def map[U: ClassTag](f: T => U): RDD[U]

147

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

148

def filter(f: T => Boolean): RDD[T]

149

def distinct(): RDD[T]

150

def distinct(numPartitions: Int): RDD[T]

151

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

152

def union(other: RDD[T]): RDD[T]

153

def intersection(other: RDD[T]): RDD[T]

154

def subtract(other: RDD[T]): RDD[T]

155

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

156

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

157

def zipWithIndex(): RDD[(T, Long)]

158

def zipWithUniqueId(): RDD[(T, Long)]

159

def zipPartitions[U: ClassTag, V: ClassTag](rdd2: RDD[U])(f: (Iterator[T], Iterator[U]) => Iterator[V]): RDD[V]

160

def zipPartitions[U: ClassTag, V: ClassTag, W: ClassTag](rdd2: RDD[U], rdd3: RDD[V])(f: (Iterator[T], Iterator[U], Iterator[V]) => Iterator[W]): RDD[W]

161

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

162

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

163

def glom(): RDD[Array[T]]

164

def keyBy[K](f: T => K): RDD[(K, T)]

165

166

// Advanced operations

167

def barrier(): RDDBarrier[T]

168

def withResources[U: ClassTag](func: Iterator[T] => Iterator[U]): RDD[U]

169

170

// Partitioning

171

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

172

def repartition(numPartitions: Int): RDD[T]

173

def partitionBy(partitioner: Partitioner): RDD[T]

174

175

// Persistence

176

def cache(): RDD[T]

177

def persist(): RDD[T]

178

def persist(newLevel: StorageLevel): RDD[T]

179

def unpersist(blocking: Boolean = true): RDD[T]

180

181

// Actions (trigger computation)

182

def collect(): Array[T]

183

def count(): Long

184

def first(): T

185

def take(num: Int): Array[T]

186

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

187

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

188

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

189

def reduce(f: (T, T) => T): T

190

def fold(zeroValue: T)(op: (T, T) => T): T

191

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

192

def foreach(f: T => Unit): Unit

193

def foreachPartition(f: Iterator[T] => Unit): Unit

194

195

// Information

196

def partitions: Array[Partition]

197

def getNumPartitions: Int

198

def isEmpty(): Boolean

199

def name: String

200

def setName(name: String): RDD[T]

201

}

202

```

203

204

### Pair RDD Functions

205

206

Additional operations available on RDDs of key-value pairs through implicit conversion.

207

208

```scala { .api }

209

class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {

210

// Transformations

211

def keys: RDD[K]

212

def values: RDD[V]

213

def mapValues[U](f: V => U): RDD[(K, U)]

214

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

215

def groupByKey(): RDD[(K, Iterable[V])]

216

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

217

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

218

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

219

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

220

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

221

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

222

223

// Joins

224

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

225

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

226

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

227

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

228

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

229

230

// Actions

231

def countByKey(): Map[K, Long]

232

def collectAsMap(): Map[K, V]

233

def lookup(key: K): Seq[V]

234

235

// Output

236

def saveAsTextFile(path: String): Unit

237

def saveAsSequenceFile(path: String): Unit

238

}

239

```

240

241

Usage example:

242

243

```scala

244

val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

245

246

// Group by key

247

val grouped = pairs.groupByKey()

248

// Result: Array(("a", Iterable(1, 3)), ("b", Iterable(2, 4)))

249

250

// Reduce by key

251

val sums = pairs.reduceByKey(_ + _)

252

// Result: Array(("a", 4), ("b", 6))

253

254

// Join with another RDD

255

val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))

256

val joined = pairs.join(other)

257

// Result: Array(("a", (1, "apple")), ("a", (3, "apple")), ("b", (2, "banana")), ("b", (4, "banana")))

258

```

259

260

### Numeric RDD Functions

261

262

Additional operations available on RDDs of numeric values.

263

264

```scala { .api }

265

class DoubleRDDFunctions(self: RDD[Double]) {

266

def sum(): Double

267

def mean(): Double

268

def variance(): Double

269

def sampleVariance(): Double

270

def stdev(): Double

271

def sampleStdev(): Double

272

def stats(): StatCounter

273

def histogram(buckets: Array[Double]): Array[Long]

274

def histogram(bucketCount: Int): (Array[Double], Array[Long])

275

}

276

277

class StatCounter extends Serializable {

278

def count: Long

279

def mean: Double

280

def sum: Double

281

def min: Double

282

def max: Double

283

def variance: Double

284

def sampleVariance: Double

285

def stdev: Double

286

def sampleStdev: Double

287

}

288

```

289

290

### Distributed Variables

291

292

Variables that can be shared across cluster nodes efficiently.

293

294

#### Broadcast Variables

295

296

Read-only variables cached on each machine rather than shipping a copy with tasks.

297

298

```scala { .api }

299

abstract class Broadcast[T] extends Serializable {

300

def value: T

301

def unpersist(): Unit

302

def unpersist(blocking: Boolean): Unit

303

def destroy(): Unit

304

def destroy(blocking: Boolean): Unit

305

def id: Long

306

}

307

```

308

309

Usage example:

310

311

```scala

312

// Create broadcast variable

313

val broadcastMap = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

314

315

// Use in RDD operations

316

val rdd = sc.parallelize(Array("key1", "key2", "key3"))

317

val result = rdd.map(key => broadcastMap.value.getOrElse(key, "default"))

318

319

// Clean up

320

broadcastMap.unpersist()

321

```

322

323

#### Accumulators

324

325

Variables that can be "added" to from parallel operations and are only readable by the driver.

326

327

```scala { .api }

328

abstract class AccumulatorV2[IN, OUT] extends Serializable {

329

def isZero: Boolean

330

def copy(): AccumulatorV2[IN, OUT]

331

def reset(): Unit

332

def add(v: IN): Unit

333

def merge(other: AccumulatorV2[IN, OUT]): Unit

334

def value: OUT

335

def name: Option[String]

336

}

337

338

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {

339

def add(v: Long): Unit

340

def add(v: java.lang.Long): Unit

341

def count: Long

342

def sum: Long

343

def avg: Double

344

}

345

346

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {

347

def add(v: Double): Unit

348

def add(v: java.lang.Double): Unit

349

def count: Long

350

def sum: Double

351

def avg: Double

352

}

353

354

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

355

def add(v: T): Unit

356

def value: java.util.List[T]

357

}

358

```

359

360

Usage example:

361

362

```scala

363

// Create accumulators

364

val counter = sc.longAccumulator("My Counter")

365

val errors = sc.collectionAccumulator[String]("Error Messages")

366

367

// Use in RDD operations

368

val data = sc.parallelize(1 to 1000)

369

data.foreach { x =>

370

counter.add(1)

371

if (x % 100 == 0) {

372

errors.add(s"Processed $x items")

373

}

374

}

375

376

println(s"Processed ${counter.value} items")

377

println(s"Errors: ${errors.value}")

378

```

379

380

### Storage Levels

381

382

Control how RDDs are stored in memory and/or disk.

383

384

```scala { .api }

385

class StorageLevel private(

386

private var _useDisk: Boolean,

387

private var _useMemory: Boolean,

388

private var _useOffHeap: Boolean,

389

private var _deserialized: Boolean,

390

private var _replication: Int

391

) extends Externalizable {

392

def useDisk: Boolean

393

def useMemory: Boolean

394

def useOffHeap: Boolean

395

def deserialized: Boolean

396

def replication: Int

397

}

398

399

object StorageLevel {

400

val NONE: StorageLevel

401

val DISK_ONLY: StorageLevel

402

val DISK_ONLY_2: StorageLevel

403

val MEMORY_ONLY: StorageLevel

404

val MEMORY_ONLY_2: StorageLevel

405

val MEMORY_ONLY_SER: StorageLevel

406

val MEMORY_ONLY_SER_2: StorageLevel

407

val MEMORY_AND_DISK: StorageLevel

408

val MEMORY_AND_DISK_2: StorageLevel

409

val MEMORY_AND_DISK_SER: StorageLevel

410

val MEMORY_AND_DISK_SER_2: StorageLevel

411

val OFF_HEAP: StorageLevel

412

}

413

```

414

415

Usage example:

416

417

```scala

418

val rdd = sc.textFile("large-file.txt")

419

420

// Cache in memory only

421

rdd.persist(StorageLevel.MEMORY_ONLY)

422

423

// Cache in memory and disk with replication

424

rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

425

426

// Use serialized storage to save memory

427

rdd.persist(StorageLevel.MEMORY_ONLY_SER)

428

```

429

430

### Partitioning

431

432

Control how data is distributed across cluster nodes.

433

434

```scala { .api }

435

abstract class Partitioner extends Serializable {

436

def numPartitions: Int

437

def getPartition(key: Any): Int

438

}

439

440

class HashPartitioner(partitions: Int) extends Partitioner {

441

def numPartitions: Int

442

def getPartition(key: Any): Int

443

def equals(other: Any): Boolean

444

def hashCode: Int

445

}

446

447

class RangePartitioner[K : Ordering : ClassTag, V](

448

partitions: Int,

449

rdd: RDD[_ <: Product2[K, V]],

450

ascending: Boolean = true

451

) extends Partitioner {

452

def numPartitions: Int

453

def getPartition(key: Any): Int

454

}

455

```

456

457

### Task Context

458

459

Contextual information and utilities available to running tasks.

460

461

```scala { .api }

462

abstract class TaskContext extends Serializable {

463

def isCompleted(): Boolean

464

def isInterrupted(): Boolean

465

def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext

466

def addTaskFailureListener(listener: TaskFailureListener): TaskContext

467

def stageId(): Int

468

def stageAttemptNumber(): Int

469

def partitionId(): Int

470

def attemptNumber(): Int

471

def taskAttemptId(): Long

472

def getLocalProperty(key: String): String

473

def taskMetrics(): TaskMetrics

474

def getMetricsSources(sourceName: String): Seq[Source]

475

}

476

477

object TaskContext {

478

def get(): TaskContext

479

def getPartitionId(): Int

480

}

481

```

482

483

Usage example:

484

485

```scala

486

val rdd = sc.parallelize(1 to 100, 4)

487

val result = rdd.mapPartitionsWithIndex { (partitionIndex, iterator) =>

488

val context = TaskContext.get()

489

println(s"Processing partition ${context.partitionId()} on stage ${context.stageId()}")

490

iterator.map(_ * 2)

491

}

492

```