0
# Core Engine
1
2
The Spark Core Engine provides the fundamental distributed computing capabilities through Resilient Distributed Datasets (RDDs), transformations, actions, and distributed variables. This is the foundation upon which all other Spark components are built.
3
4
## Package Information
5
6
Core engine functionality is available through:
7
8
```scala
9
import org.apache.spark.{SparkConf, SparkContext}
10
import org.apache.spark.rdd.RDD
11
import org.apache.spark.broadcast.Broadcast
12
import org.apache.spark.util.{LongAccumulator, DoubleAccumulator}
13
```
14
15
## Basic Usage
16
17
```scala
18
import org.apache.spark.{SparkConf, SparkContext}
19
20
// Initialize Spark
21
val conf = new SparkConf()
22
.setAppName("My Spark Application")
23
.setMaster("local[*]") // or cluster URL
24
25
val sc = new SparkContext(conf)
26
27
// Create RDD from collection
28
val numbers = sc.parallelize(1 to 1000000)
29
30
// Transform and compute
31
val result = numbers
32
.filter(_ % 2 == 0)
33
.map(_ * 2)
34
.reduce(_ + _)
35
36
println(s"Result: $result")
37
sc.stop()
38
```
39
40
## Capabilities
41
42
### Spark Context
43
44
The main entry point for Spark functionality. Represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables.
45
46
```scala { .api }
47
class SparkContext(config: SparkConf) {
48
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
49
def parallelize[T: ClassTag](seq: Seq[T]): RDD[T]
50
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
51
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
52
def sequenceFile[K, V](path: String)(implicit km: ClassTag[K], vm: ClassTag[V]): RDD[(K, V)]
53
54
// Distributed variables
55
def broadcast[T](value: T): Broadcast[T]
56
def longAccumulator(): LongAccumulator
57
def longAccumulator(name: String): LongAccumulator
58
def doubleAccumulator(): DoubleAccumulator
59
def doubleAccumulator(name: String): DoubleAccumulator
60
61
// File distribution
62
def addFile(path: String): Unit
63
def addFile(path: String, recursive: Boolean): Unit
64
def addJar(path: String): Unit
65
66
// Job and resource management
67
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
68
def clearJobGroup(): Unit
69
def addJobTag(tag: String): Unit
70
def removeJobTag(tag: String): Unit
71
def getJobTags(): Set[String]
72
def clearJobTags(): Unit
73
def requestExecutors(numExecutors: Int): Boolean
74
def killExecutors(executorIds: Seq[String]): Boolean
75
def getExecutorMemoryStatus: Map[String, (Long, Long)]
76
77
// Control
78
def stop(): Unit
79
def getConf: SparkConf
80
def defaultParallelism: Int
81
def version: String
82
}
83
```
84
85
Usage example:
86
87
```scala
88
val conf = new SparkConf().setAppName("MyApp")
89
val sc = new SparkContext(conf)
90
91
// Create RDD from file
92
val textRDD = sc.textFile("hdfs://path/to/file.txt")
93
94
// Create RDD from collection
95
val numbersRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
96
97
// Create broadcast variable
98
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
99
100
// Create accumulator
101
val counter = sc.longAccumulator("MyCounter")
102
```
103
104
### Spark Configuration
105
106
Configuration for Spark applications, controlling various aspects of execution.
107
108
```scala { .api }
109
class SparkConf(loadDefaults: Boolean = true) {
110
def set(key: String, value: String): SparkConf
111
def setIfMissing(key: String, value: String): SparkConf
112
def setAppName(name: String): SparkConf
113
def setMaster(master: String): SparkConf
114
def setJars(jars: Seq[String]): SparkConf
115
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
116
117
def get(key: String): String
118
def get(key: String, defaultValue: String): String
119
def getOption(key: String): Option[String]
120
def getAll: Array[(String, String)]
121
def contains(key: String): Boolean
122
123
def remove(key: String): SparkConf
124
def clone(): SparkConf
125
}
126
```
127
128
Usage example:
129
130
```scala
131
val conf = new SparkConf()
132
.setAppName("My Application")
133
.setMaster("yarn")
134
.set("spark.executor.memory", "2g")
135
.set("spark.executor.cores", "4")
136
.set("spark.sql.adaptive.enabled", "true")
137
```
138
139
### Resilient Distributed Datasets (RDDs)
140
141
The fundamental data structure in Spark. An RDD is an immutable, distributed collection of objects that can be processed in parallel.
142
143
```scala { .api }
144
abstract class RDD[T: ClassTag] extends Serializable {
145
// Transformations (lazy)
146
def map[U: ClassTag](f: T => U): RDD[U]
147
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
148
def filter(f: T => Boolean): RDD[T]
149
def distinct(): RDD[T]
150
def distinct(numPartitions: Int): RDD[T]
151
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
152
def union(other: RDD[T]): RDD[T]
153
def intersection(other: RDD[T]): RDD[T]
154
def subtract(other: RDD[T]): RDD[T]
155
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
156
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
157
def zipWithIndex(): RDD[(T, Long)]
158
def zipWithUniqueId(): RDD[(T, Long)]
159
def zipPartitions[U: ClassTag, V: ClassTag](rdd2: RDD[U])(f: (Iterator[T], Iterator[U]) => Iterator[V]): RDD[V]
160
def zipPartitions[U: ClassTag, V: ClassTag, W: ClassTag](rdd2: RDD[U], rdd3: RDD[V])(f: (Iterator[T], Iterator[U], Iterator[V]) => Iterator[W]): RDD[W]
161
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
162
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
163
def glom(): RDD[Array[T]]
164
def keyBy[K](f: T => K): RDD[(K, T)]
165
166
// Advanced operations
167
def barrier(): RDDBarrier[T]
168
def withResources[U: ClassTag](func: Iterator[T] => Iterator[U]): RDD[U]
169
170
// Partitioning
171
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
172
def repartition(numPartitions: Int): RDD[T]
173
def partitionBy(partitioner: Partitioner): RDD[T]
174
175
// Persistence
176
def cache(): RDD[T]
177
def persist(): RDD[T]
178
def persist(newLevel: StorageLevel): RDD[T]
179
def unpersist(blocking: Boolean = true): RDD[T]
180
181
// Actions (trigger computation)
182
def collect(): Array[T]
183
def count(): Long
184
def first(): T
185
def take(num: Int): Array[T]
186
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
187
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
188
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
189
def reduce(f: (T, T) => T): T
190
def fold(zeroValue: T)(op: (T, T) => T): T
191
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
192
def foreach(f: T => Unit): Unit
193
def foreachPartition(f: Iterator[T] => Unit): Unit
194
195
// Information
196
def partitions: Array[Partition]
197
def getNumPartitions: Int
198
def isEmpty(): Boolean
199
def name: String
200
def setName(name: String): RDD[T]
201
}
202
```
203
204
### Pair RDD Functions
205
206
Additional operations available on RDDs of key-value pairs through implicit conversion.
207
208
```scala { .api }
209
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
210
// Transformations
211
def keys: RDD[K]
212
def values: RDD[V]
213
def mapValues[U](f: V => U): RDD[(K, U)]
214
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
215
def groupByKey(): RDD[(K, Iterable[V])]
216
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
217
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
218
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
219
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
220
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
221
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
222
223
// Joins
224
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
225
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
226
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
227
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
228
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
229
230
// Actions
231
def countByKey(): Map[K, Long]
232
def collectAsMap(): Map[K, V]
233
def lookup(key: K): Seq[V]
234
235
// Output
236
def saveAsTextFile(path: String): Unit
237
def saveAsSequenceFile(path: String): Unit
238
}
239
```
240
241
Usage example:
242
243
```scala
244
val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
245
246
// Group by key
247
val grouped = pairs.groupByKey()
248
// Result: Array(("a", Iterable(1, 3)), ("b", Iterable(2, 4)))
249
250
// Reduce by key
251
val sums = pairs.reduceByKey(_ + _)
252
// Result: Array(("a", 4), ("b", 6))
253
254
// Join with another RDD
255
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
256
val joined = pairs.join(other)
257
// Result: Array(("a", (1, "apple")), ("a", (3, "apple")), ("b", (2, "banana")), ("b", (4, "banana")))
258
```
259
260
### Numeric RDD Functions
261
262
Additional operations available on RDDs of numeric values.
263
264
```scala { .api }
265
class DoubleRDDFunctions(self: RDD[Double]) {
266
def sum(): Double
267
def mean(): Double
268
def variance(): Double
269
def sampleVariance(): Double
270
def stdev(): Double
271
def sampleStdev(): Double
272
def stats(): StatCounter
273
def histogram(buckets: Array[Double]): Array[Long]
274
def histogram(bucketCount: Int): (Array[Double], Array[Long])
275
}
276
277
class StatCounter extends Serializable {
278
def count: Long
279
def mean: Double
280
def sum: Double
281
def min: Double
282
def max: Double
283
def variance: Double
284
def sampleVariance: Double
285
def stdev: Double
286
def sampleStdev: Double
287
}
288
```
289
290
### Distributed Variables
291
292
Variables that can be shared across cluster nodes efficiently.
293
294
#### Broadcast Variables
295
296
Read-only variables cached on each machine rather than shipping a copy with tasks.
297
298
```scala { .api }
299
abstract class Broadcast[T] extends Serializable {
300
def value: T
301
def unpersist(): Unit
302
def unpersist(blocking: Boolean): Unit
303
def destroy(): Unit
304
def destroy(blocking: Boolean): Unit
305
def id: Long
306
}
307
```
308
309
Usage example:
310
311
```scala
312
// Create broadcast variable
313
val broadcastMap = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
314
315
// Use in RDD operations
316
val rdd = sc.parallelize(Array("key1", "key2", "key3"))
317
val result = rdd.map(key => broadcastMap.value.getOrElse(key, "default"))
318
319
// Clean up
320
broadcastMap.unpersist()
321
```
322
323
#### Accumulators
324
325
Variables that can be "added" to from parallel operations and are only readable by the driver.
326
327
```scala { .api }
328
abstract class AccumulatorV2[IN, OUT] extends Serializable {
329
def isZero: Boolean
330
def copy(): AccumulatorV2[IN, OUT]
331
def reset(): Unit
332
def add(v: IN): Unit
333
def merge(other: AccumulatorV2[IN, OUT]): Unit
334
def value: OUT
335
def name: Option[String]
336
}
337
338
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
339
def add(v: Long): Unit
340
def add(v: java.lang.Long): Unit
341
def count: Long
342
def sum: Long
343
def avg: Double
344
}
345
346
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
347
def add(v: Double): Unit
348
def add(v: java.lang.Double): Unit
349
def count: Long
350
def sum: Double
351
def avg: Double
352
}
353
354
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
355
def add(v: T): Unit
356
def value: java.util.List[T]
357
}
358
```
359
360
Usage example:
361
362
```scala
363
// Create accumulators
364
val counter = sc.longAccumulator("My Counter")
365
val errors = sc.collectionAccumulator[String]("Error Messages")
366
367
// Use in RDD operations
368
val data = sc.parallelize(1 to 1000)
369
data.foreach { x =>
370
counter.add(1)
371
if (x % 100 == 0) {
372
errors.add(s"Processed $x items")
373
}
374
}
375
376
println(s"Processed ${counter.value} items")
377
println(s"Errors: ${errors.value}")
378
```
379
380
### Storage Levels
381
382
Control how RDDs are stored in memory and/or disk.
383
384
```scala { .api }
385
class StorageLevel private(
386
private var _useDisk: Boolean,
387
private var _useMemory: Boolean,
388
private var _useOffHeap: Boolean,
389
private var _deserialized: Boolean,
390
private var _replication: Int
391
) extends Externalizable {
392
def useDisk: Boolean
393
def useMemory: Boolean
394
def useOffHeap: Boolean
395
def deserialized: Boolean
396
def replication: Int
397
}
398
399
object StorageLevel {
400
val NONE: StorageLevel
401
val DISK_ONLY: StorageLevel
402
val DISK_ONLY_2: StorageLevel
403
val MEMORY_ONLY: StorageLevel
404
val MEMORY_ONLY_2: StorageLevel
405
val MEMORY_ONLY_SER: StorageLevel
406
val MEMORY_ONLY_SER_2: StorageLevel
407
val MEMORY_AND_DISK: StorageLevel
408
val MEMORY_AND_DISK_2: StorageLevel
409
val MEMORY_AND_DISK_SER: StorageLevel
410
val MEMORY_AND_DISK_SER_2: StorageLevel
411
val OFF_HEAP: StorageLevel
412
}
413
```
414
415
Usage example:
416
417
```scala
418
val rdd = sc.textFile("large-file.txt")
419
420
// Cache in memory only
421
rdd.persist(StorageLevel.MEMORY_ONLY)
422
423
// Cache in memory and disk with replication
424
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
425
426
// Use serialized storage to save memory
427
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
428
```
429
430
### Partitioning
431
432
Control how data is distributed across cluster nodes.
433
434
```scala { .api }
435
abstract class Partitioner extends Serializable {
436
def numPartitions: Int
437
def getPartition(key: Any): Int
438
}
439
440
class HashPartitioner(partitions: Int) extends Partitioner {
441
def numPartitions: Int
442
def getPartition(key: Any): Int
443
def equals(other: Any): Boolean
444
def hashCode: Int
445
}
446
447
class RangePartitioner[K : Ordering : ClassTag, V](
448
partitions: Int,
449
rdd: RDD[_ <: Product2[K, V]],
450
ascending: Boolean = true
451
) extends Partitioner {
452
def numPartitions: Int
453
def getPartition(key: Any): Int
454
}
455
```
456
457
### Task Context
458
459
Contextual information and utilities available to running tasks.
460
461
```scala { .api }
462
abstract class TaskContext extends Serializable {
463
def isCompleted(): Boolean
464
def isInterrupted(): Boolean
465
def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
466
def addTaskFailureListener(listener: TaskFailureListener): TaskContext
467
def stageId(): Int
468
def stageAttemptNumber(): Int
469
def partitionId(): Int
470
def attemptNumber(): Int
471
def taskAttemptId(): Long
472
def getLocalProperty(key: String): String
473
def taskMetrics(): TaskMetrics
474
def getMetricsSources(sourceName: String): Seq[Source]
475
}
476
477
object TaskContext {
478
def get(): TaskContext
479
def getPartitionId(): Int
480
}
481
```
482
483
Usage example:
484
485
```scala
486
val rdd = sc.parallelize(1 to 100, 4)
487
val result = rdd.mapPartitionsWithIndex { (partitionIndex, iterator) =>
488
val context = TaskContext.get()
489
println(s"Processing partition ${context.partitionId()} on stage ${context.stageId()}")
490
iterator.map(_ * 2)
491
}
492
```