0
# Core Engine
1
2
Apache Spark's core engine provides the fundamental distributed computing capabilities through SparkContext and Resilient Distributed Datasets (RDDs). This is the foundation layer that all other Spark components are built upon.
3
4
## Capabilities
5
6
### SparkContext
7
8
The main entry point for Spark functionality and the connection to a Spark cluster. Only one SparkContext should be active per JVM.
9
10
```scala { .api }
11
/**
12
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
13
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
14
*/
15
class SparkContext(config: SparkConf) extends Logging {
16
17
/**
18
* Creates a SparkContext that loads settings from system properties
19
*/
20
def this() = this(new SparkConf())
21
22
/**
23
* Alternative constructor for setting common Spark properties directly
24
*/
25
def this(master: String, appName: String, conf: SparkConf)
26
27
/**
28
* Alternative constructor with full configuration
29
*/
30
def this(
31
master: String,
32
appName: String,
33
sparkHome: String = null,
34
jars: Seq[String] = Nil,
35
environment: Map[String, String] = Map()
36
)
37
38
// Core RDD creation methods
39
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
40
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
41
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
42
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
43
44
// Broadcast and accumulator operations
45
def broadcast[T: ClassTag](value: T): Broadcast[T]
46
def longAccumulator: LongAccumulator
47
def longAccumulator(name: String): LongAccumulator
48
def doubleAccumulator: DoubleAccumulator
49
def doubleAccumulator(name: String): DoubleAccumulator
50
def collectionAccumulator[T]: CollectionAccumulator[T]
51
52
// Lifecycle management
53
def stop(): Unit
54
def stop(exitCode: Int): Unit
55
56
// Configuration and metadata
57
def getConf: SparkConf
58
def master: String
59
def appName: String
60
def applicationId: String
61
def deployMode: String
62
def version: String
63
def startTime: Long
64
def defaultParallelism: Int
65
def defaultMinPartitions: Int
66
}
67
```
68
69
**Usage Examples:**
70
71
```scala
72
import org.apache.spark.{SparkConf, SparkContext}
73
74
// Basic SparkContext creation
75
val conf = new SparkConf()
76
.setAppName("MyApp")
77
.setMaster("local[*]") // Use all available cores
78
val sc = new SparkContext(conf)
79
80
// Alternative constructor
81
val sc2 = new SparkContext("local[4]", "MyApp")
82
83
// Create RDDs
84
val numbers = sc.parallelize(1 to 1000)
85
val textRDD = sc.textFile("path/to/file.txt")
86
87
// Always stop the context when done
88
sc.stop()
89
```
90
91
### RDD (Resilient Distributed Dataset)
92
93
The fundamental data abstraction in Spark - an immutable, partitioned collection of elements that can be operated on in parallel.
94
95
```scala { .api }
96
/**
97
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
98
* partitioned collection of elements that can be operated on in parallel.
99
*/
100
abstract class RDD[T: ClassTag] extends Serializable with Logging {
101
102
// Transformations (lazy operations that return new RDDs)
103
def map[U: ClassTag](f: T => U): RDD[U]
104
def filter(f: T => Boolean): RDD[T]
105
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
106
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
107
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
108
109
// Set operations
110
def distinct(): RDD[T]
111
def distinct(numPartitions: Int): RDD[T]
112
def union(other: RDD[T]): RDD[T]
113
def intersection(other: RDD[T]): RDD[T]
114
def subtract(other: RDD[T]): RDD[T]
115
116
// Sampling
117
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
118
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
119
120
// Partitioning
121
def repartition(numPartitions: Int): RDD[T]
122
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
123
def partitionBy(partitioner: Partitioner): RDD[T] // For paired RDDs
124
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[T] // For paired RDDs
125
126
// Persistence and caching
127
def persist(): RDD[T]
128
def persist(newLevel: StorageLevel): RDD[T]
129
def cache(): RDD[T] // Equivalent to persist(MEMORY_ONLY)
130
def unpersist(blocking: Boolean = false): RDD[T]
131
132
// Actions (operations that return values or write data)
133
def collect(): Array[T]
134
def count(): Long
135
def first(): T
136
def take(num: Int): Array[T]
137
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
138
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
139
def reduce(f: (T, T) => T): T
140
def fold(zeroValue: T)(op: (T, T) => T): T
141
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
142
def foreach(f: T => Unit): Unit
143
def foreachPartition(f: Iterator[T] => Unit): Unit
144
145
// Statistical operations
146
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
147
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
148
def countApproxDistinct(relativeSD: Double = 0.05): Long
149
150
// Save operations
151
def saveAsTextFile(path: String): Unit
152
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
153
def saveAsObjectFile(path: String): Unit
154
155
// Metadata and debugging
156
def id: Int
157
def name: String
158
def setName(name: String): RDD[T]
159
def sparkContext: SparkContext
160
def partitions: Array[Partition]
161
def partitioner: Option[Partitioner]
162
def dependencies: Seq[Dependency[_]]
163
def getStorageLevel: StorageLevel
164
def checkpoint(): Unit
165
def isCheckpointed: Boolean
166
def getCheckpointFile: Option[String]
167
def toDebugString: String
168
}
169
```
170
171
**Usage Examples:**
172
173
```scala
174
import org.apache.spark.SparkContext
175
import org.apache.spark.storage.StorageLevel
176
177
val sc = new SparkContext(/* config */)
178
179
// Create RDD
180
val numbers = sc.parallelize(1 to 1000000)
181
182
// Transformations (lazy)
183
val evenNumbers = numbers.filter(_ % 2 == 0)
184
val doubled = evenNumbers.map(_ * 2)
185
val pairs = doubled.map(x => (x, x * x))
186
187
// Persistence for reuse
188
val cachedRDD = doubled.cache() // Cache in memory
189
val persistedRDD = pairs.persist(StorageLevel.MEMORY_AND_DISK_SER)
190
191
// Actions (trigger computation)
192
val total = doubled.reduce(_ + _)
193
val first10 = doubled.take(10)
194
val count = doubled.count()
195
196
// Collect small results (use carefully - brings all data to driver)
197
val allEvens = evenNumbers.collect()
198
199
// Process partitions
200
doubled.foreachPartition { partition =>
201
// Process each partition
202
partition.foreach(println)
203
}
204
205
// Advanced operations
206
val sample = numbers.sample(withReplacement = false, fraction = 0.1)
207
val distinct = numbers.distinct()
208
209
// Repartitioning
210
val repartitioned = numbers.repartition(100) // Shuffle to 100 partitions
211
val coalesced = numbers.coalesce(10) // Reduce to 10 partitions (no shuffle)
212
```
213
214
### Key-Value RDD Operations (PairRDDFunctions)
215
216
Additional operations available on RDDs of key-value pairs.
217
218
```scala { .api }
219
// These methods are available on RDD[(K, V)] through implicit conversions
220
implicit class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
221
222
// Grouping operations
223
def groupByKey(): RDD[(K, Iterable[V])]
224
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
225
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
226
227
// Reduction operations
228
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
229
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
230
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
231
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
232
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
233
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
234
235
// Join operations
236
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
237
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
238
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
239
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
240
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
241
242
// Sorting and partitioning
243
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
244
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
245
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
246
247
// Actions
248
def countByKey(): Map[K, Long]
249
def collectAsMap(): Map[K, V]
250
def lookup(key: K): Seq[V]
251
252
// Save operations
253
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
254
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
255
}
256
```
257
258
**Usage Examples:**
259
260
```scala
261
// Create paired RDD
262
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))
263
264
// Group by key
265
val grouped = pairs.groupByKey()
266
// Result: ("a", [1, 3]), ("b", [2, 4]), ("c", [5])
267
268
// Reduce by key
269
val summed = pairs.reduceByKey(_ + _)
270
// Result: ("a", 4), ("b", 6), ("c", 5)
271
272
// Join operations
273
val other = sc.parallelize(Seq(("a", "apple"), ("b", "banana")))
274
val joined = pairs.join(other)
275
// Result: ("a", (1, "apple")), ("a", (3, "apple")), ("b", (2, "banana")), ("b", (4, "banana"))
276
277
// Advanced aggregation
278
val avgByKey = pairs.aggregateByKey((0, 0))(
279
(acc, value) => (acc._1 + value, acc._2 + 1), // Combine value with accumulator
280
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Combine accumulators
281
).mapValues(acc => acc._1.toDouble / acc._2) // Calculate average
282
283
// Sorting
284
val sortedPairs = pairs.sortByKey()
285
286
// Actions
287
val countsByKey = pairs.countByKey()
288
val asMap = pairs.collectAsMap() // Note: last value per key wins
289
val lookupA = pairs.lookup("a") // Returns Seq(1, 3)
290
```
291
292
### Broadcast Variables
293
294
Read-only variables cached on each machine rather than shipping a copy with each task.
295
296
```scala { .api }
297
/**
298
* A broadcast variable created with SparkContext.broadcast().
299
* Access its value through value.
300
*/
301
class Broadcast[T] extends Serializable {
302
def value: T
303
def unpersist(): Unit
304
def unpersist(blocking: Boolean): Unit
305
def destroy(): Unit
306
def id: Long
307
}
308
```
309
310
**Usage Examples:**
311
312
```scala
313
// Create broadcast variable
314
val broadcastData = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
315
316
// Use in transformations
317
val rdd = sc.parallelize(Seq("key1", "key2", "key3"))
318
val result = rdd.map { key =>
319
val lookup = broadcastData.value // Access broadcast value
320
lookup.getOrElse(key, "default")
321
}
322
323
// Clean up
324
broadcastData.unpersist()
325
```
326
327
### Accumulators
328
329
Shared variables that support associative and commutative operations for aggregation across tasks.
330
331
```scala { .api }
332
// Built-in accumulator types
333
trait AccumulatorV2[IN, OUT] extends Serializable {
334
def isZero: Boolean
335
def copy(): AccumulatorV2[IN, OUT]
336
def reset(): Unit
337
def add(v: IN): Unit
338
def merge(other: AccumulatorV2[IN, OUT]): Unit
339
def value: OUT
340
}
341
342
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long]
343
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double]
344
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]
345
```
346
347
**Usage Examples:**
348
349
```scala
350
// Create accumulators
351
val longAcc = sc.longAccumulator("My Long Accumulator")
352
val doubleAcc = sc.doubleAccumulator("My Double Accumulator")
353
val collectionAcc = sc.collectionAccumulator[String]("My Collection Accumulator")
354
355
// Use in transformations
356
val rdd = sc.parallelize(1 to 100)
357
rdd.foreach { value =>
358
longAcc.add(value)
359
if (value % 2 == 0) doubleAcc.add(value.toDouble)
360
if (value % 10 == 0) collectionAcc.add(s"Multiple of 10: $value")
361
}
362
363
// Access results (only on driver)
364
println(s"Sum: ${longAcc.value}")
365
println(s"Sum of evens: ${doubleAcc.value}")
366
println(s"Multiples of 10: ${collectionAcc.value}")
367
```
368
369
## Configuration
370
371
Key SparkConf settings for core engine:
372
373
```scala { .api }
374
import org.apache.spark.SparkConf
375
376
val conf = new SparkConf()
377
.setAppName("MySparkApp")
378
.setMaster("local[*]") // or yarn, spark://host:port, etc.
379
380
// Executor configuration
381
.set("spark.executor.memory", "2g")
382
.set("spark.executor.cores", "2")
383
.set("spark.executor.instances", "4")
384
385
// Driver configuration
386
.set("spark.driver.memory", "1g")
387
.set("spark.driver.cores", "2")
388
389
// Serialization
390
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
391
392
// Shuffle configuration
393
.set("spark.sql.shuffle.partitions", "200")
394
.set("spark.default.parallelism", "100")
395
396
// Storage
397
.set("spark.storage.memoryFraction", "0.6")
398
.set("spark.storage.memoryMap.threshold", "2m")
399
```
400
401
## Performance Best Practices
402
403
### RDD Operations
404
1. **Prefer transformations over actions**: Build up transformation chains and minimize actions
405
2. **Cache frequently used RDDs**: Use `cache()` or `persist()` for RDDs accessed multiple times
406
3. **Choose appropriate storage levels**: Balance memory usage and recomputation cost
407
4. **Avoid `collect()` on large datasets**: Can cause out-of-memory errors on driver
408
5. **Use `mapPartitions()` for expensive setup**: Amortize initialization costs across partition elements
409
410
### Partitioning
411
1. **Control partitioning explicitly**: Use `repartition()` for even distribution, `coalesce()` to reduce partitions
412
2. **Partition by keys for joins**: Use `partitionBy()` before joins to avoid shuffles
413
3. **Consider partition size**: Aim for 128MB - 1GB per partition
414
415
### Memory Management
416
1. **Configure storage levels appropriately**: Use serialized storage for memory pressure
417
2. **Tune garbage collection**: Use G1GC for large heaps
418
3. **Monitor storage tab in Spark UI**: Watch for evicted RDDs and memory pressure
419
420
## Debugging and Monitoring
421
422
### RDD Lineage and Dependencies
423
```scala
424
// View RDD lineage
425
println(rdd.toDebugString)
426
427
// Check dependencies
428
rdd.dependencies.foreach(println)
429
430
// Monitor storage
431
println(rdd.getStorageLevel)
432
```
433
434
### Spark UI Access
435
- **Jobs Tab**: View job progress and timing
436
- **Stages Tab**: See task-level metrics and skew
437
- **Storage Tab**: Monitor cached RDDs and memory usage
438
- **Executors Tab**: Check executor health and resource usage
439
- **Environment Tab**: Review configuration settings
440
441
The core engine forms the foundation of all Spark functionality, providing distributed computing primitives that higher-level APIs build upon.