or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdexceptions.mdgraphx.mdindex.mdlogging.mdmllib.mdsql.mdstorage.mdstreaming.mdutils.md

core.mddocs/

0

# Core Engine

1

2

Apache Spark's core engine provides the fundamental distributed computing capabilities through SparkContext and Resilient Distributed Datasets (RDDs). This is the foundation layer that all other Spark components are built upon.

3

4

## Capabilities

5

6

### SparkContext

7

8

The main entry point for Spark functionality and the connection to a Spark cluster. Only one SparkContext should be active per JVM.

9

10

```scala { .api }

11

/**

12

* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark

13

* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

14

*/

15

class SparkContext(config: SparkConf) extends Logging {

16

17

/**

18

* Creates a SparkContext that loads settings from system properties

19

*/

20

def this() = this(new SparkConf())

21

22

/**

23

* Alternative constructor for setting common Spark properties directly

24

*/

25

def this(master: String, appName: String, conf: SparkConf)

26

27

/**

28

* Alternative constructor with full configuration

29

*/

30

def this(

31

master: String,

32

appName: String,

33

sparkHome: String = null,

34

jars: Seq[String] = Nil,

35

environment: Map[String, String] = Map()

36

)

37

38

// Core RDD creation methods

39

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

40

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

41

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

42

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

43

44

// Broadcast and accumulator operations

45

def broadcast[T: ClassTag](value: T): Broadcast[T]

46

def longAccumulator: LongAccumulator

47

def longAccumulator(name: String): LongAccumulator

48

def doubleAccumulator: DoubleAccumulator

49

def doubleAccumulator(name: String): DoubleAccumulator

50

def collectionAccumulator[T]: CollectionAccumulator[T]

51

52

// Lifecycle management

53

def stop(): Unit

54

def stop(exitCode: Int): Unit

55

56

// Configuration and metadata

57

def getConf: SparkConf

58

def master: String

59

def appName: String

60

def applicationId: String

61

def deployMode: String

62

def version: String

63

def startTime: Long

64

def defaultParallelism: Int

65

def defaultMinPartitions: Int

66

}

67

```

68

69

**Usage Examples:**

70

71

```scala

72

import org.apache.spark.{SparkConf, SparkContext}

73

74

// Basic SparkContext creation

75

val conf = new SparkConf()

76

.setAppName("MyApp")

77

.setMaster("local[*]") // Use all available cores

78

val sc = new SparkContext(conf)

79

80

// Alternative constructor

81

val sc2 = new SparkContext("local[4]", "MyApp")

82

83

// Create RDDs

84

val numbers = sc.parallelize(1 to 1000)

85

val textRDD = sc.textFile("path/to/file.txt")

86

87

// Always stop the context when done

88

sc.stop()

89

```

90

91

### RDD (Resilient Distributed Dataset)

92

93

The fundamental data abstraction in Spark - an immutable, partitioned collection of elements that can be operated on in parallel.

94

95

```scala { .api }

96

/**

97

* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,

98

* partitioned collection of elements that can be operated on in parallel.

99

*/

100

abstract class RDD[T: ClassTag] extends Serializable with Logging {

101

102

// Transformations (lazy operations that return new RDDs)

103

def map[U: ClassTag](f: T => U): RDD[U]

104

def filter(f: T => Boolean): RDD[T]

105

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

106

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

107

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

108

109

// Set operations

110

def distinct(): RDD[T]

111

def distinct(numPartitions: Int): RDD[T]

112

def union(other: RDD[T]): RDD[T]

113

def intersection(other: RDD[T]): RDD[T]

114

def subtract(other: RDD[T]): RDD[T]

115

116

// Sampling

117

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

118

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

119

120

// Partitioning

121

def repartition(numPartitions: Int): RDD[T]

122

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

123

def partitionBy(partitioner: Partitioner): RDD[T] // For paired RDDs

124

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[T] // For paired RDDs

125

126

// Persistence and caching

127

def persist(): RDD[T]

128

def persist(newLevel: StorageLevel): RDD[T]

129

def cache(): RDD[T] // Equivalent to persist(MEMORY_ONLY)

130

def unpersist(blocking: Boolean = false): RDD[T]

131

132

// Actions (operations that return values or write data)

133

def collect(): Array[T]

134

def count(): Long

135

def first(): T

136

def take(num: Int): Array[T]

137

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

138

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

139

def reduce(f: (T, T) => T): T

140

def fold(zeroValue: T)(op: (T, T) => T): T

141

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

142

def foreach(f: T => Unit): Unit

143

def foreachPartition(f: Iterator[T] => Unit): Unit

144

145

// Statistical operations

146

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

147

def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

148

def countApproxDistinct(relativeSD: Double = 0.05): Long

149

150

// Save operations

151

def saveAsTextFile(path: String): Unit

152

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

153

def saveAsObjectFile(path: String): Unit

154

155

// Metadata and debugging

156

def id: Int

157

def name: String

158

def setName(name: String): RDD[T]

159

def sparkContext: SparkContext

160

def partitions: Array[Partition]

161

def partitioner: Option[Partitioner]

162

def dependencies: Seq[Dependency[_]]

163

def getStorageLevel: StorageLevel

164

def checkpoint(): Unit

165

def isCheckpointed: Boolean

166

def getCheckpointFile: Option[String]

167

def toDebugString: String

168

}

169

```

170

171

**Usage Examples:**

172

173

```scala

174

import org.apache.spark.SparkContext

175

import org.apache.spark.storage.StorageLevel

176

177

val sc = new SparkContext(/* config */)

178

179

// Create RDD

180

val numbers = sc.parallelize(1 to 1000000)

181

182

// Transformations (lazy)

183

val evenNumbers = numbers.filter(_ % 2 == 0)

184

val doubled = evenNumbers.map(_ * 2)

185

val pairs = doubled.map(x => (x, x * x))

186

187

// Persistence for reuse

188

val cachedRDD = doubled.cache() // Cache in memory

189

val persistedRDD = pairs.persist(StorageLevel.MEMORY_AND_DISK_SER)

190

191

// Actions (trigger computation)

192

val total = doubled.reduce(_ + _)

193

val first10 = doubled.take(10)

194

val count = doubled.count()

195

196

// Collect small results (use carefully - brings all data to driver)

197

val allEvens = evenNumbers.collect()

198

199

// Process partitions

200

doubled.foreachPartition { partition =>

201

// Process each partition

202

partition.foreach(println)

203

}

204

205

// Advanced operations

206

val sample = numbers.sample(withReplacement = false, fraction = 0.1)

207

val distinct = numbers.distinct()

208

209

// Repartitioning

210

val repartitioned = numbers.repartition(100) // Shuffle to 100 partitions

211

val coalesced = numbers.coalesce(10) // Reduce to 10 partitions (no shuffle)

212

```

213

214

### Key-Value RDD Operations (PairRDDFunctions)

215

216

Additional operations available on RDDs of key-value pairs.

217

218

```scala { .api }

219

// These methods are available on RDD[(K, V)] through implicit conversions

220

implicit class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

221

222

// Grouping operations

223

def groupByKey(): RDD[(K, Iterable[V])]

224

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

225

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

226

227

// Reduction operations

228

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

229

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

230

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

231

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

232

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

233

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

234

235

// Join operations

236

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

237

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

238

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

239

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

240

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

241

242

// Sorting and partitioning

243

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

244

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

245

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

246

247

// Actions

248

def countByKey(): Map[K, Long]

249

def collectAsMap(): Map[K, V]

250

def lookup(key: K): Seq[V]

251

252

// Save operations

253

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit

254

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

255

}

256

```

257

258

**Usage Examples:**

259

260

```scala

261

// Create paired RDD

262

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))

263

264

// Group by key

265

val grouped = pairs.groupByKey()

266

// Result: ("a", [1, 3]), ("b", [2, 4]), ("c", [5])

267

268

// Reduce by key

269

val summed = pairs.reduceByKey(_ + _)

270

// Result: ("a", 4), ("b", 6), ("c", 5)

271

272

// Join operations

273

val other = sc.parallelize(Seq(("a", "apple"), ("b", "banana")))

274

val joined = pairs.join(other)

275

// Result: ("a", (1, "apple")), ("a", (3, "apple")), ("b", (2, "banana")), ("b", (4, "banana"))

276

277

// Advanced aggregation

278

val avgByKey = pairs.aggregateByKey((0, 0))(

279

(acc, value) => (acc._1 + value, acc._2 + 1), // Combine value with accumulator

280

(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Combine accumulators

281

).mapValues(acc => acc._1.toDouble / acc._2) // Calculate average

282

283

// Sorting

284

val sortedPairs = pairs.sortByKey()

285

286

// Actions

287

val countsByKey = pairs.countByKey()

288

val asMap = pairs.collectAsMap() // Note: last value per key wins

289

val lookupA = pairs.lookup("a") // Returns Seq(1, 3)

290

```

291

292

### Broadcast Variables

293

294

Read-only variables cached on each machine rather than shipping a copy with each task.

295

296

```scala { .api }

297

/**

298

* A broadcast variable created with SparkContext.broadcast().

299

* Access its value through value.

300

*/

301

class Broadcast[T] extends Serializable {

302

def value: T

303

def unpersist(): Unit

304

def unpersist(blocking: Boolean): Unit

305

def destroy(): Unit

306

def id: Long

307

}

308

```

309

310

**Usage Examples:**

311

312

```scala

313

// Create broadcast variable

314

val broadcastData = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

315

316

// Use in transformations

317

val rdd = sc.parallelize(Seq("key1", "key2", "key3"))

318

val result = rdd.map { key =>

319

val lookup = broadcastData.value // Access broadcast value

320

lookup.getOrElse(key, "default")

321

}

322

323

// Clean up

324

broadcastData.unpersist()

325

```

326

327

### Accumulators

328

329

Shared variables that support associative and commutative operations for aggregation across tasks.

330

331

```scala { .api }

332

// Built-in accumulator types

333

trait AccumulatorV2[IN, OUT] extends Serializable {

334

def isZero: Boolean

335

def copy(): AccumulatorV2[IN, OUT]

336

def reset(): Unit

337

def add(v: IN): Unit

338

def merge(other: AccumulatorV2[IN, OUT]): Unit

339

def value: OUT

340

}

341

342

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long]

343

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double]

344

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]

345

```

346

347

**Usage Examples:**

348

349

```scala

350

// Create accumulators

351

val longAcc = sc.longAccumulator("My Long Accumulator")

352

val doubleAcc = sc.doubleAccumulator("My Double Accumulator")

353

val collectionAcc = sc.collectionAccumulator[String]("My Collection Accumulator")

354

355

// Use in transformations

356

val rdd = sc.parallelize(1 to 100)

357

rdd.foreach { value =>

358

longAcc.add(value)

359

if (value % 2 == 0) doubleAcc.add(value.toDouble)

360

if (value % 10 == 0) collectionAcc.add(s"Multiple of 10: $value")

361

}

362

363

// Access results (only on driver)

364

println(s"Sum: ${longAcc.value}")

365

println(s"Sum of evens: ${doubleAcc.value}")

366

println(s"Multiples of 10: ${collectionAcc.value}")

367

```

368

369

## Configuration

370

371

Key SparkConf settings for core engine:

372

373

```scala { .api }

374

import org.apache.spark.SparkConf

375

376

val conf = new SparkConf()

377

.setAppName("MySparkApp")

378

.setMaster("local[*]") // or yarn, spark://host:port, etc.

379

380

// Executor configuration

381

.set("spark.executor.memory", "2g")

382

.set("spark.executor.cores", "2")

383

.set("spark.executor.instances", "4")

384

385

// Driver configuration

386

.set("spark.driver.memory", "1g")

387

.set("spark.driver.cores", "2")

388

389

// Serialization

390

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

391

392

// Shuffle configuration

393

.set("spark.sql.shuffle.partitions", "200")

394

.set("spark.default.parallelism", "100")

395

396

// Storage

397

.set("spark.storage.memoryFraction", "0.6")

398

.set("spark.storage.memoryMap.threshold", "2m")

399

```

400

401

## Performance Best Practices

402

403

### RDD Operations

404

1. **Prefer transformations over actions**: Build up transformation chains and minimize actions

405

2. **Cache frequently used RDDs**: Use `cache()` or `persist()` for RDDs accessed multiple times

406

3. **Choose appropriate storage levels**: Balance memory usage and recomputation cost

407

4. **Avoid `collect()` on large datasets**: Can cause out-of-memory errors on driver

408

5. **Use `mapPartitions()` for expensive setup**: Amortize initialization costs across partition elements

409

410

### Partitioning

411

1. **Control partitioning explicitly**: Use `repartition()` for even distribution, `coalesce()` to reduce partitions

412

2. **Partition by keys for joins**: Use `partitionBy()` before joins to avoid shuffles

413

3. **Consider partition size**: Aim for 128MB - 1GB per partition

414

415

### Memory Management

416

1. **Configure storage levels appropriately**: Use serialized storage for memory pressure

417

2. **Tune garbage collection**: Use G1GC for large heaps

418

3. **Monitor storage tab in Spark UI**: Watch for evicted RDDs and memory pressure

419

420

## Debugging and Monitoring

421

422

### RDD Lineage and Dependencies

423

```scala

424

// View RDD lineage

425

println(rdd.toDebugString)

426

427

// Check dependencies

428

rdd.dependencies.foreach(println)

429

430

// Monitor storage

431

println(rdd.getStorageLevel)

432

```

433

434

### Spark UI Access

435

- **Jobs Tab**: View job progress and timing

436

- **Stages Tab**: See task-level metrics and skew

437

- **Storage Tab**: Monitor cached RDDs and memory usage

438

- **Executors Tab**: Check executor health and resource usage

439

- **Environment Tab**: Review configuration settings

440

441

The core engine forms the foundation of all Spark functionality, providing distributed computing primitives that higher-level APIs build upon.