0
# Core Data Processing
1
2
Core distributed data processing using Resilient Distributed Datasets (RDDs) and the fundamental Spark execution engine. RDDs provide fault-tolerant, parallel data structures with transformations and actions for large-scale data processing.
3
4
## Capabilities
5
6
### SparkContext
7
8
Main entry point for Spark functionality, representing the connection to a Spark cluster. Used to create RDDs, broadcast variables, and accumulators.
9
10
```scala { .api }
11
/**
12
* Main entry point for Spark functionality
13
* @param config Spark configuration object
14
*/
15
class SparkContext(config: SparkConf) {
16
/** Create RDD from a collection */
17
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
18
/** Create RDD from text file(s) */
19
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
20
/** Create RDD from whole text files */
21
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
22
/** Create RDD from binary files */
23
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
24
/** Create RDD from Hadoop file */
25
def hadoopRDD[K, V](
26
conf: JobConf,
27
inputFormatClass: Class[_ <: InputFormat[K, V]],
28
keyClass: Class[K],
29
valueClass: Class[V],
30
minPartitions: Int = defaultMinPartitions
31
): RDD[(K, V)]
32
/** Create broadcast variable */
33
def broadcast[T: ClassTag](value: T): Broadcast[T]
34
/** Create accumulator */
35
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
36
/** Set job group for tracking */
37
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
38
/** Stop SparkContext */
39
def stop(): Unit
40
}
41
```
42
43
**Usage Examples:**
44
45
```scala
46
import org.apache.spark.{SparkContext, SparkConf}
47
48
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
49
val sc = new SparkContext(conf)
50
51
// Create RDD from collection
52
val numbersRDD = sc.parallelize(1 to 1000, 4)
53
54
// Create RDD from text file
55
val textRDD = sc.textFile("hdfs://path/to/file.txt")
56
57
// Create broadcast variable
58
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
59
60
sc.stop()
61
```
62
63
**Python API:**
64
65
```python { .api }
66
class SparkContext:
67
"""
68
Main entry point for Spark functionality in Python
69
"""
70
def __init__(self, conf: SparkConf = None, master: str = None, appName: str = None)
71
def parallelize(self, c: Iterable, numSlices: int = None) -> RDD
72
def textFile(self, name: str, minPartitions: int = None, use_unicode: bool = True) -> RDD
73
def wholeTextFiles(self, path: str, minPartitions: int = None, use_unicode: bool = True) -> RDD
74
def binaryFiles(self, path: str, minPartitions: int = None) -> RDD
75
def broadcast(self, value: Any) -> Broadcast
76
def accumulator(self, value: Any, accum_param: AccumulatorParam = None) -> Accumulator
77
def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool = False) -> None
78
def stop(self) -> None
79
```
80
81
**Python Usage Examples:**
82
83
```python
84
from pyspark import SparkContext, SparkConf
85
86
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
87
sc = SparkContext(conf=conf)
88
89
# Create RDD from collection
90
numbers_rdd = sc.parallelize(range(1, 1001), 4)
91
92
# Create RDD from text file
93
text_rdd = sc.textFile("hdfs://path/to/file.txt")
94
95
# Create broadcast variable
96
broadcast_var = sc.broadcast({"key1": "value1", "key2": "value2"})
97
98
sc.stop()
99
```
100
101
### RDD[T]
102
103
Resilient Distributed Dataset - the fundamental data structure of Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
104
105
```scala { .api }
106
/**
107
* Resilient Distributed Dataset - core abstraction for distributed collections
108
*/
109
abstract class RDD[T: ClassTag] {
110
/** Transform each element using a function */
111
def map[U: ClassTag](f: T => U): RDD[U]
112
/** Transform each element to zero or more elements */
113
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
114
/** Keep only elements matching predicate */
115
def filter(f: T => Boolean): RDD[T]
116
/** Remove duplicates */
117
def distinct(numPartitions: Int = partitions.length): RDD[T]
118
/** Random sample of elements */
119
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
120
/** Union with another RDD */
121
def union(other: RDD[T]): RDD[T]
122
/** Intersection with another RDD */
123
def intersection(other: RDD[T]): RDD[T]
124
/** Cartesian product with another RDD */
125
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
126
/** Group elements by key function */
127
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
128
/** Sort elements */
129
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
130
131
/** Collect all elements to driver */
132
def collect(): Array[T]
133
/** Take first n elements */
134
def take(num: Int): Array[T]
135
/** Get first element */
136
def first(): T
137
/** Count number of elements */
138
def count(): Long
139
/** Reduce elements using function */
140
def reduce(f: (T, T) => T): T
141
/** Fold elements with zero value */
142
def fold(zeroValue: T)(op: (T, T) => T): T
143
/** Aggregate with different result type */
144
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
145
146
/** Cache RDD in memory */
147
def cache(): RDD[T]
148
/** Persist with storage level */
149
def persist(newLevel: StorageLevel): RDD[T]
150
/** Remove from cache */
151
def unpersist(blocking: Boolean = false): RDD[T]
152
}
153
```
154
155
**Usage Examples:**
156
157
```scala
158
val data = sc.parallelize(1 to 100, 4)
159
160
// Transformations (lazy)
161
val doubled = data.map(_ * 2)
162
val filtered = doubled.filter(_ > 50)
163
val unique = filtered.distinct()
164
165
// Actions (trigger computation)
166
val result = unique.collect()
167
val count = unique.count()
168
val first10 = unique.take(10)
169
170
// Persist for reuse
171
unique.cache()
172
173
// Complex operations
174
val grouped = data.groupBy(_ % 10) // Group by remainder
175
val aggregated = data.aggregate(0)(_ + _, _ + _) // Sum all elements
176
```
177
178
**Python RDD API:**
179
180
```python { .api }
181
class RDD:
182
"""
183
Resilient Distributed Dataset - Python implementation
184
"""
185
def map(self, f: Callable) -> RDD
186
def flatMap(self, f: Callable) -> RDD
187
def filter(self, f: Callable) -> RDD
188
def distinct(self, numPartitions: int = None) -> RDD
189
def sample(self, withReplacement: bool, fraction: float, seed: int = None) -> RDD
190
def union(self, other: RDD) -> RDD
191
def intersection(self, other: RDD) -> RDD
192
def cartesian(self, other: RDD) -> RDD
193
def groupBy(self, f: Callable, numPartitions: int = None) -> RDD
194
def sortBy(self, keyfunc: Callable, ascending: bool = True, numPartitions: int = None) -> RDD
195
196
# Actions
197
def collect(self) -> List
198
def take(self, num: int) -> List
199
def first(self) -> Any
200
def count(self) -> int
201
def reduce(self, f: Callable) -> Any
202
def fold(self, zeroValue: Any, op: Callable) -> Any
203
def aggregate(self, zeroValue: Any, seqOp: Callable, combOp: Callable) -> Any
204
def foreach(self, f: Callable) -> None
205
def foreachPartition(self, f: Callable) -> None
206
207
# Persistence
208
def cache(self) -> RDD
209
def persist(self, storageLevel: StorageLevel = None) -> RDD
210
def unpersist(self, blocking: bool = False) -> RDD
211
```
212
213
**Python Usage Examples:**
214
215
```python
216
data = sc.parallelize(range(1, 101), 4)
217
218
# Transformations (lazy)
219
doubled = data.map(lambda x: x * 2)
220
filtered = doubled.filter(lambda x: x > 50)
221
unique = filtered.distinct()
222
223
# Actions (trigger computation)
224
result = unique.collect()
225
count = unique.count()
226
first_10 = unique.take(10)
227
228
# Persist for reuse
229
unique.cache()
230
231
# Complex operations
232
grouped = data.groupBy(lambda x: x % 10) # Group by remainder
233
aggregated = data.aggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) # Sum
234
```
235
236
### PairRDDFunctions
237
238
Additional operations available on RDDs of key-value pairs through implicit conversions.
239
240
```scala { .api }
241
/**
242
* Additional operations for RDDs of key-value pairs
243
*/
244
implicit class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
245
/** Get keys only */
246
def keys: RDD[K]
247
/** Get values only */
248
def values: RDD[V]
249
/** Transform values keeping keys */
250
def mapValues[U](f: V => U): RDD[(K, U)]
251
/** Group values by key */
252
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
253
/** Reduce values by key */
254
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
255
/** Aggregate values by key */
256
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
257
/** Sort by keys */
258
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
259
/** Inner join with another pair RDD */
260
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
261
/** Left outer join */
262
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
263
/** Right outer join */
264
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
265
/** Full outer join */
266
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
267
/** Cogroup with another pair RDD */
268
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
269
/** Save as text file */
270
def saveAsTextFile(path: String): Unit
271
/** Save as Hadoop sequence file */
272
def saveAsSequenceFile(path: String): Unit
273
}
274
```
275
276
**Usage Examples:**
277
278
```scala
279
val pairs = sc.parallelize(Seq(("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)))
280
281
// Basic pair operations
282
val keys = pairs.keys.collect() // Array("apple", "banana", "apple", "cherry")
283
val values = pairs.values.collect() // Array(1, 2, 3, 1)
284
285
// Group and reduce
286
val grouped = pairs.groupByKey().collect()
287
val sums = pairs.reduceByKey(_ + _).collect() // Array(("apple", 4), ("banana", 2), ("cherry", 1))
288
289
// Joins
290
val other = sc.parallelize(Seq(("apple", "red"), ("banana", "yellow")))
291
val joined = pairs.join(other).collect()
292
```
293
294
**Python Pair RDD Operations:**
295
296
In Python, RDDs of key-value pairs (tuples) automatically have these operations available:
297
298
```python { .api }
299
# Pair RDD operations available on RDD of tuples
300
class RDD: # When containing (key, value) tuples
301
def keys(self) -> RDD
302
def values(self) -> RDD
303
def mapValues(self, f: Callable) -> RDD
304
def groupByKey(self, numPartitions: int = None) -> RDD
305
def reduceByKey(self, func: Callable, numPartitions: int = None) -> RDD
306
def aggregateByKey(self, zeroValue: Any, seqFunc: Callable, combFunc: Callable, numPartitions: int = None) -> RDD
307
def sortByKey(self, ascending: bool = True, numPartitions: int = None, keyfunc: Callable = None) -> RDD
308
def join(self, other: RDD, numPartitions: int = None) -> RDD
309
def leftOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
310
def rightOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
311
def fullOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
312
def cogroup(self, other: RDD, numPartitions: int = None) -> RDD
313
def saveAsTextFile(self, path: str) -> None
314
```
315
316
**Python Usage Examples:**
317
318
```python
319
pairs = sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)])
320
321
# Basic pair operations
322
keys = pairs.keys().collect() # ['apple', 'banana', 'apple', 'cherry']
323
values = pairs.values().collect() # [1, 2, 3, 1]
324
325
# Group and reduce
326
grouped = pairs.groupByKey().collect()
327
sums = pairs.reduceByKey(lambda a, b: a + b).collect() # [('apple', 4), ('banana', 2), ('cherry', 1)]
328
329
# Joins
330
other = sc.parallelize([("apple", "red"), ("banana", "yellow")])
331
joined = pairs.join(other).collect()
332
```
333
334
### Configuration
335
336
Configuration management for Spark applications.
337
338
```scala { .api }
339
/**
340
* Configuration for Spark applications
341
* @param loadDefaults whether to load default configurations
342
*/
343
class SparkConf(loadDefaults: Boolean = true) {
344
/** Set configuration property */
345
def set(key: String, value: String): SparkConf
346
/** Set master URL */
347
def setMaster(master: String): SparkConf
348
/** Set application name */
349
def setAppName(name: String): SparkConf
350
/** Set JAR files */
351
def setJars(jars: Seq[String]): SparkConf
352
/** Set executor environment variable */
353
def setExecutorEnv(variable: String, value: String): SparkConf
354
/** Set Spark home directory */
355
def setSparkHome(home: String): SparkConf
356
/** Set all properties from iterable */
357
def setAll(settings: Iterable[(String, String)]): SparkConf
358
/** Get configuration value */
359
def get(key: String): String
360
/** Get configuration value with default */
361
def get(key: String, defaultValue: String): String
362
/** Get boolean configuration value */
363
def getBoolean(key: String, defaultValue: Boolean): Boolean
364
}
365
```
366
367
### Shared Variables
368
369
Variables that can be shared across cluster nodes efficiently.
370
371
```scala { .api }
372
/**
373
* Broadcast variable for efficient data sharing
374
*/
375
abstract class Broadcast[T: ClassTag] {
376
/** Access broadcast value */
377
def value: T
378
/** Remove from executors but keep in driver */
379
def unpersist(): Unit
380
/** Destroy completely */
381
def destroy(): Unit
382
}
383
384
/**
385
* Accumulator for collecting information from executors
386
*/
387
class Accumulator[T] {
388
/** Get current value (driver only) */
389
def value: T
390
/** Add to accumulator */
391
def +=(term: T): Unit
392
/** Add to accumulator (alternative syntax) */
393
def add(term: T): Unit
394
}
395
```
396
397
### Storage Levels
398
399
Storage levels for RDD persistence.
400
401
```scala { .api }
402
/**
403
* Storage levels for RDD caching and persistence
404
*/
405
object StorageLevel {
406
val NONE: StorageLevel
407
val DISK_ONLY: StorageLevel
408
val DISK_ONLY_2: StorageLevel
409
val MEMORY_ONLY: StorageLevel
410
val MEMORY_ONLY_2: StorageLevel
411
val MEMORY_ONLY_SER: StorageLevel
412
val MEMORY_ONLY_SER_2: StorageLevel
413
val MEMORY_AND_DISK: StorageLevel
414
val MEMORY_AND_DISK_2: StorageLevel
415
val MEMORY_AND_DISK_SER: StorageLevel
416
val MEMORY_AND_DISK_SER_2: StorageLevel
417
val OFF_HEAP: StorageLevel
418
}
419
```
420
421
**Usage Examples:**
422
423
```scala
424
import org.apache.spark.storage.StorageLevel
425
426
val rdd = sc.parallelize(1 to 1000000)
427
428
// Different persistence levels
429
rdd.persist(StorageLevel.MEMORY_ONLY)
430
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
431
rdd.persist(StorageLevel.DISK_ONLY_2) // 2x replication
432
433
// Shorthand for memory-only
434
rdd.cache()
435
```
436
437
## Error Handling
438
439
Common exceptions in core Spark operations:
440
441
- `SparkException` - General Spark runtime exceptions
442
- `TaskFailedException` - Task execution failures
443
- `TaskKilledException` - Task cancellation
444
- `SparkOutOfMemoryError` - Memory-related errors