0
# Core Engine APIs
1
2
The Spark Core engine provides the fundamental distributed computing capabilities through Resilient Distributed Datasets (RDDs) and the SparkContext. This is the foundation upon which all other Spark components are built.
3
4
## SparkContext
5
6
The SparkContext is the main entry point for Spark functionality and represents the connection to a Spark cluster.
7
8
```scala { .api }
9
class SparkContext(config: SparkConf) {
10
// Core data creation
11
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
12
def makeRDD[T: ClassTag](seq: Seq[T]): RDD[T]
13
def range(end: Long): RDD[Long]
14
def range(start: Long, end: Long): RDD[Long]
15
def range(start: Long, end: Long, step: Long): RDD[Long]
16
def range(start: Long, end: Long, step: Long, numPartitions: Int): RDD[Long]
17
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
18
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
19
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
20
21
// Hadoop integration
22
def hadoopFile[K, V](path: String, inputFormatClass: Class[InputFormat[K, V]],
23
keyClass: Class[K], valueClass: Class[V],
24
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
25
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],
26
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
27
28
// Shared variables
29
def broadcast[T: ClassTag](value: T): Broadcast[T]
30
def longAccumulator(): LongAccumulator
31
def longAccumulator(name: String): LongAccumulator
32
def doubleAccumulator(): DoubleAccumulator
33
def doubleAccumulator(name: String): DoubleAccumulator
34
def collectionAccumulator[T](): CollectionAccumulator[T]
35
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
36
37
// Application management
38
def stop(): Unit
39
def setLogLevel(logLevel: String): Unit
40
def setCheckpointDir(directory: String): Unit
41
def addFile(path: String): Unit
42
def addJar(path: String): Unit
43
44
// Properties
45
def master: String
46
def appName: String
47
def version: String
48
def defaultParallelism: Int
49
def defaultMinPartitions: Int
50
def startTime: Long
51
}
52
```
53
54
### Usage Examples
55
56
```scala
57
import org.apache.spark.{SparkContext, SparkConf}
58
59
// Create SparkContext
60
val conf = new SparkConf()
61
.setAppName("MyApp")
62
.setMaster("local[*]")
63
val sc = new SparkContext(conf)
64
65
// Create RDD from collection
66
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
67
68
// Create RDD from range
69
val numbers = sc.range(1, 1000000, 2) // start=1, end=1000000, step=2
70
71
// Read from files
72
val textRDD = sc.textFile("hdfs://path/to/file.txt")
73
val wholeFiles = sc.wholeTextFiles("hdfs://path/to/directory")
74
val binaryFiles = sc.binaryFiles("hdfs://path/to/binary/files")
75
76
// Create broadcast variable
77
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
78
79
// Create accumulator
80
val counter = sc.longAccumulator("Counter")
81
82
sc.stop()
83
```
84
85
## SparkConf
86
87
Configuration object for Spark applications.
88
89
```scala { .api }
90
class SparkConf(loadDefaults: Boolean = true) {
91
// Configuration methods
92
def set(key: String, value: String): SparkConf
93
def setMaster(master: String): SparkConf
94
def setAppName(name: String): SparkConf
95
def setJars(jars: Seq[String]): SparkConf
96
def setExecutorEnv(variable: String, value: String): SparkConf
97
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
98
def setSparkHome(home: String): SparkConf
99
100
// Retrieval methods
101
def get(key: String): String
102
def get(key: String, defaultValue: String): String
103
def getOption(key: String): Option[String]
104
def getAll: Array[(String, String)]
105
def contains(key: String): Boolean
106
def remove(key: String): SparkConf
107
108
// Utility methods
109
def clone(): SparkConf
110
def toDebugString: String
111
}
112
```
113
114
### Usage Examples
115
116
```scala
117
import org.apache.spark.SparkConf
118
119
val conf = new SparkConf()
120
.setAppName("MyApplication")
121
.setMaster("yarn")
122
.set("spark.executor.memory", "2g")
123
.set("spark.executor.cores", "2")
124
.set("spark.sql.adaptive.enabled", "true")
125
.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")
126
127
// Get configuration values
128
val appName = conf.get("spark.app.name")
129
val executorMemory = conf.getOption("spark.executor.memory")
130
```
131
132
## RDD (Resilient Distributed Dataset)
133
134
The fundamental data structure in Spark representing an immutable, partitioned collection of elements.
135
136
```scala { .api }
137
abstract class RDD[T: ClassTag](
138
@transient private var _sc: SparkContext,
139
@transient private var deps: Seq[Dependency[_]]) {
140
141
// Transformations (lazy operations)
142
def map[U: ClassTag](f: T => U): RDD[U]
143
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
144
def filter(f: T => Boolean): RDD[T]
145
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
146
preservePartitioning: Boolean = false): RDD[U]
147
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],
148
preservePartitioning: Boolean = false): RDD[U]
149
150
// Set operations
151
def distinct(numPartitions: Int = partitions.length): RDD[T]
152
def union(other: RDD[T]): RDD[T]
153
def intersection(other: RDD[T]): RDD[T]
154
def subtract(other: RDD[T]): RDD[T]
155
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
156
157
// Sampling and partitioning
158
def sample(withReplacement: Boolean, fraction: Double,
159
seed: Long = Utils.random.nextLong): RDD[T]
160
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
161
def repartition(numPartitions: Int): RDD[T]
162
def sortBy[K](f: T => K, ascending: Boolean = true,
163
numPartitions: Int = this.partitions.length)
164
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
165
166
// Utility transformations
167
def pipe(command: String): RDD[String]
168
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
169
def zipWithIndex(): RDD[(T, Long)]
170
def zipWithUniqueId(): RDD[(T, Long)]
171
def glom(): RDD[Array[T]]
172
def keyBy[K](f: T => K): RDD[(K, T)]
173
174
// Actions (trigger computation)
175
def collect(): Array[T]
176
def count(): Long
177
def first(): T
178
def take(num: Int): Array[T]
179
def takeSample(withReplacement: Boolean, num: Int,
180
seed: Long = Utils.random.nextLong): Array[T]
181
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
182
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
183
184
// Aggregation operations
185
def reduce(f: (T, T) => T): T
186
def fold(zeroValue: T)(op: (T, T) => T): T
187
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
188
def treeReduce(f: (T, T) => T, depth: Int = 2): T
189
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U,
190
depth: Int = 2): U
191
192
// Statistical operations
193
def countByValue()(implicit ord: Ordering[T]): Map[T, Long]
194
def max()(implicit ord: Ordering[T]): T
195
def min()(implicit ord: Ordering[T]): T
196
197
// Side effects
198
def foreach(f: T => Unit): Unit
199
def foreachPartition(f: Iterator[T] => Unit): Unit
200
201
// Persistence
202
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): this.type
203
def cache(): this.type
204
def unpersist(blocking: Boolean = false): this.type
205
def checkpoint(): Unit
206
def localCheckpoint(): this.type
207
208
// Storage and output
209
def saveAsTextFile(path: String): Unit
210
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
211
def saveAsObjectFile(path: String): Unit
212
213
// Metadata
214
def getNumPartitions: Int
215
def partitions: Array[Partition]
216
def getStorageLevel: StorageLevel
217
def setName(name: String): this.type
218
def name: String
219
def id: Int
220
def context: SparkContext
221
def sparkContext: SparkContext
222
}
223
```
224
225
### Pair RDD Operations
226
227
For RDDs containing key-value pairs, additional operations are available:
228
229
```scala { .api }
230
// Available on RDD[(K, V)] through implicit conversions
231
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
232
// Aggregation by key
233
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
234
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
235
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
236
def groupByKey(): RDD[(K, Iterable[V])]
237
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
238
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
239
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
240
mergeCombiners: (C, C) => C): RDD[(K, C)]
241
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
242
243
// Joins
244
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
245
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
246
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
247
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
248
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
249
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
250
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
251
252
// Sorting and partitioning
253
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
254
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
255
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
256
257
// Output operations
258
def countByKey(): Map[K, Long]
259
def collectAsMap(): Map[K, V]
260
def lookup(key: K): Seq[V]
261
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,
262
keyClass: Class[_], valueClass: Class[_],
263
outputFormatClass: Class[F]): Unit
264
}
265
```
266
267
### Usage Examples
268
269
```scala
270
import org.apache.spark.{SparkContext, SparkConf}
271
272
val sc = new SparkContext(new SparkConf().setAppName("RDD Example").setMaster("local[*]"))
273
274
// Basic transformations
275
val numbers = sc.parallelize(1 to 10)
276
val squared = numbers.map(x => x * x)
277
val evens = numbers.filter(_ % 2 == 0)
278
val words = sc.textFile("file.txt").flatMap(_.split(" "))
279
280
// Pair RDD operations
281
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
282
val grouped = pairs.groupByKey()
283
val reduced = pairs.reduceByKey(_ + _)
284
val joined = pairs.join(sc.parallelize(Seq(("a", "apple"), ("b", "banana"))))
285
286
// Actions
287
val result = squared.collect()
288
val count = words.count()
289
val wordCounts = words.map((_, 1)).reduceByKey(_ + _).collectAsMap()
290
291
// Persistence
292
val cachedRDD = numbers.cache()
293
val persistedRDD = numbers.persist(StorageLevel.MEMORY_AND_DISK)
294
295
sc.stop()
296
```
297
298
## Broadcast Variables
299
300
Efficiently distribute large read-only data to all nodes.
301
302
```scala { .api }
303
abstract class Broadcast[T](val id: Long) {
304
def value: T
305
def unpersist(): Unit
306
def unpersist(blocking: Boolean): Unit
307
def destroy(): Unit
308
def toString: String
309
}
310
```
311
312
### Usage Examples
313
314
```scala
315
val broadcastMap = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
316
317
val rdd = sc.parallelize(Seq("key1", "key2", "key3"))
318
val result = rdd.map(key => broadcastMap.value.getOrElse(key, "unknown"))
319
320
// Clean up when done
321
broadcastMap.destroy()
322
```
323
324
## Accumulators
325
326
Variables that can be added to from executors and read from the driver.
327
328
```scala { .api }
329
abstract class AccumulatorV2[IN, OUT] {
330
def isZero: Boolean
331
def copy(): AccumulatorV2[IN, OUT]
332
def reset(): Unit
333
def add(v: IN): Unit
334
def merge(other: AccumulatorV2[IN, OUT]): Unit
335
def value: OUT
336
}
337
338
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
339
def add(v: Long): Unit
340
def add(v: java.lang.Long): Unit
341
def count: Long
342
def sum: Long
343
def avg: Double
344
}
345
346
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
347
def add(v: Double): Unit
348
def add(v: java.lang.Double): Unit
349
def count: Long
350
def sum: Double
351
def avg: Double
352
}
353
354
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
355
def add(v: T): Unit
356
def value: java.util.List[T]
357
}
358
```
359
360
### Usage Examples
361
362
```scala
363
val counter = sc.longAccumulator("Counter")
364
val doubleAcc = sc.doubleAccumulator("Double Accumulator")
365
val listAcc = sc.collectionAccumulator[String]("List Accumulator")
366
367
val rdd = sc.parallelize(1 to 100)
368
rdd.foreach { x =>
369
counter.add(1)
370
doubleAcc.add(x.toDouble)
371
if (x % 10 == 0) listAcc.add(s"Multiple of 10: $x")
372
}
373
374
println(s"Count: ${counter.value}")
375
println(s"Sum: ${doubleAcc.sum}")
376
println(s"Collected items: ${listAcc.value}")
377
```