0
# SparkContext API
1
2
The SparkContext is the main entry point for all Spark functionality. It represents the connection to a Spark cluster and coordinates the execution of operations across the cluster. Every Spark application must create exactly one active SparkContext.
3
4
## SparkContext Class
5
6
```scala { .api }
7
class SparkContext(config: SparkConf) extends Logging {
8
// Primary constructor
9
def this() = this(new SparkConf())
10
def this(master: String, appName: String, conf: SparkConf) = { /* ... */ }
11
def this(master: String, appName: String, sparkHome: String, jars: Seq[String], environment: Map[String, String] = Map()) = { /* ... */ }
12
}
13
```
14
15
## Creating SparkContext
16
17
### Basic Construction
18
19
```scala { .api }
20
import org.apache.spark.{SparkContext, SparkConf}
21
22
// Using SparkConf (recommended)
23
val conf = new SparkConf()
24
.setAppName("My Application")
25
.setMaster("local[*]")
26
.set("spark.executor.memory", "2g")
27
28
val sc = new SparkContext(conf)
29
```
30
31
### Alternative Constructors
32
33
```scala { .api }
34
// Default constructor (loads from system properties)
35
val sc = new SparkContext()
36
37
// With master and app name
38
val sc = new SparkContext("local[*]", "My App")
39
40
// Full constructor with all parameters
41
val sc = new SparkContext(
42
master = "local[*]",
43
appName = "My App",
44
sparkHome = "/path/to/spark",
45
jars = Seq("myapp.jar"),
46
environment = Map("SPARK_ENV_VAR" -> "value")
47
)
48
```
49
50
### Developer API Constructor (for YARN)
51
52
```scala { .api }
53
// @DeveloperApi - for internal use, typically in YARN mode
54
val sc = new SparkContext(
55
config = conf,
56
preferredNodeLocationData = Map[String, Set[SplitInfo]]()
57
)
58
```
59
60
## RDD Creation Methods
61
62
### From Collections
63
64
**parallelize**: Distribute a local collection to form an RDD
65
```scala { .api }
66
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
67
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] // alias
68
```
69
70
```scala
71
val data = Array(1, 2, 3, 4, 5)
72
val rdd = sc.parallelize(data) // Use default parallelism
73
val rddWithPartitions = sc.parallelize(data, 4) // Specify 4 partitions
74
```
75
76
**With location preferences**:
77
```scala { .api }
78
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
79
```
80
81
```scala
82
// Create RDD with preferred locations for each element
83
val dataWithPrefs = Seq(
84
(1, Seq("host1", "host2")),
85
(2, Seq("host3")),
86
(3, Seq("host1"))
87
)
88
val rdd = sc.makeRDD(dataWithPrefs)
89
```
90
91
### From Files
92
93
**textFile**: Read text files from HDFS or local filesystem
94
```scala { .api }
95
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
96
```
97
98
```scala
99
val lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
100
val linesLocal = sc.textFile("file:///local/path/file.txt")
101
val linesWithPartitions = sc.textFile("hdfs://path/to/file.txt", 8)
102
```
103
104
**wholeTextFiles**: Read directory of text files as key-value pairs
105
```scala { .api }
106
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
107
```
108
109
```scala
110
// Returns RDD[(filename, content)]
111
val files = sc.wholeTextFiles("hdfs://path/to/directory/")
112
files.foreach { case (filename, content) =>
113
println(s"File: $filename, Size: ${content.length}")
114
}
115
```
116
117
### Hadoop Files
118
119
**sequenceFile**: Read Hadoop SequenceFiles
120
```scala { .api }
121
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
122
```
123
124
```scala
125
import org.apache.hadoop.io.{IntWritable, Text}
126
127
val seqFile = sc.sequenceFile[IntWritable, Text]("path/to/sequencefile",
128
classOf[IntWritable], classOf[Text])
129
```
130
131
**hadoopFile**: Read files with arbitrary Hadoop InputFormat
132
```scala { .api }
133
def hadoopFile[K, V](
134
path: String,
135
inputFormatClass: Class[_ <: InputFormat[K, V]],
136
keyClass: Class[K],
137
valueClass: Class[V],
138
minPartitions: Int = defaultMinPartitions
139
): RDD[(K, V)]
140
```
141
142
```scala
143
import org.apache.hadoop.mapred.TextInputFormat
144
import org.apache.hadoop.io.{LongWritable, Text}
145
146
val hadoopRDD = sc.hadoopFile[LongWritable, Text](
147
"hdfs://path/to/input",
148
classOf[TextInputFormat],
149
classOf[LongWritable],
150
classOf[Text]
151
)
152
```
153
154
**objectFile**: Load RDD saved as SequenceFile of serialized objects
155
```scala { .api }
156
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
157
```
158
159
```scala
160
// Load RDD that was previously saved with saveAsObjectFile
161
val restored: RDD[MyClass] = sc.objectFile[MyClass]("path/to/objects")
162
```
163
164
### RDD Manipulation
165
166
**union**: Build union of a list of RDDs
167
```scala { .api }
168
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
169
```
170
171
```scala
172
val rdd1 = sc.parallelize(Array(1, 2, 3))
173
val rdd2 = sc.parallelize(Array(4, 5, 6))
174
val rdd3 = sc.parallelize(Array(7, 8, 9))
175
176
val unionRDD = sc.union(Seq(rdd1, rdd2, rdd3))
177
```
178
179
**emptyRDD**: Create empty RDD with no partitions
180
```scala { .api }
181
def emptyRDD[T: ClassTag]: RDD[T]
182
```
183
184
```scala
185
val empty: RDD[String] = sc.emptyRDD[String]
186
```
187
188
## Shared Variables
189
190
Spark provides two types of shared variables: broadcast variables and accumulators.
191
192
### Broadcast Variables
193
194
**broadcast**: Create a broadcast variable for read-only data
195
```scala { .api }
196
def broadcast[T: ClassTag](value: T): Broadcast[T]
197
```
198
199
```scala
200
val lookupTable = Map("apple" -> 1, "banana" -> 2, "orange" -> 3)
201
val broadcastTable = sc.broadcast(lookupTable)
202
203
val data = sc.parallelize(Array("apple", "banana", "apple"))
204
val mapped = data.map(fruit => broadcastTable.value.getOrElse(fruit, 0))
205
```
206
207
### Accumulators
208
209
**accumulator**: Create a simple accumulator
210
```scala { .api }
211
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
212
```
213
214
**accumulable**: Create an accumulable with different result/element types
215
```scala { .api }
216
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]
217
```
218
219
```scala
220
// Simple counter
221
val counter = sc.accumulator(0, "Error Counter")
222
223
val data = sc.parallelize(Array(1, 2, -1, 4, -5))
224
val positive = data.filter { x =>
225
if (x < 0) counter += 1 // Count negative numbers
226
x > 0
227
}
228
positive.count() // Trigger action
229
println(s"Negative numbers: ${counter.value}")
230
231
// Collection accumulator
232
val errorList = sc.accumulableCollection(mutable.Set[String]())
233
```
234
235
#### Built-in AccumulatorParam Types
236
237
```scala { .api }
238
// Available in SparkContext companion object
239
DoubleAccumulatorParam // For Double values
240
IntAccumulatorParam // For Int values
241
LongAccumulatorParam // For Long values
242
FloatAccumulatorParam // For Float values
243
```
244
245
## Job Control and Execution
246
247
### Running Jobs
248
249
**runJob**: Run a function on RDD partitions
250
```scala { .api }
251
def runJob[T, U: ClassTag](
252
rdd: RDD[T],
253
func: (TaskContext, Iterator[T]) => U,
254
partitions: Seq[Int],
255
allowLocal: Boolean = false,
256
resultHandler: (Int, U) => Unit = null
257
): Array[U]
258
259
// Simplified versions
260
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
261
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
262
```
263
264
```scala
265
val data = sc.parallelize(1 to 100, 4)
266
267
// Run custom function on each partition
268
val results = sc.runJob(data, (iter: Iterator[Int]) => iter.sum)
269
println(s"Partition sums: ${results.mkString(", ")}")
270
271
// With task context
272
val results2 = sc.runJob(data, (context: TaskContext, iter: Iterator[Int]) => {
273
(context.partitionId, iter.size)
274
})
275
```
276
277
**submitJob**: Submit job asynchronously (Experimental)
278
```scala { .api }
279
def submitJob[T, U, R](
280
rdd: RDD[T],
281
processPartition: Iterator[T] => U,
282
partitions: Seq[Int],
283
resultHandler: (Int, U) => Unit,
284
resultFunc: => R
285
): SimpleFutureAction[R]
286
```
287
288
### Job Groups and Cancellation
289
290
**setJobGroup**: Assign group ID to all jobs started by this thread
291
```scala { .api }
292
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
293
```
294
295
**clearJobGroup**: Clear the job group for this thread
296
```scala { .api }
297
def clearJobGroup(): Unit
298
```
299
300
**cancelJobGroup**: Cancel all jobs for the given group
301
```scala { .api }
302
def cancelJobGroup(groupId: String): Unit
303
```
304
305
**cancelAllJobs**: Cancel all scheduled or running jobs
306
```scala { .api }
307
def cancelAllJobs(): Unit
308
```
309
310
```scala
311
// Set job group
312
sc.setJobGroup("etl-jobs", "ETL Processing", interruptOnCancel = true)
313
314
val data = sc.textFile("large-file.txt")
315
val processed = data.map(processLine)
316
processed.saveAsTextFile("output")
317
318
// Cancel specific job group from another thread
319
sc.cancelJobGroup("etl-jobs")
320
```
321
322
## Configuration and Properties
323
324
### Configuration Access
325
326
**getConf**: Get a copy of the SparkContext's configuration
327
```scala { .api }
328
def getConf: SparkConf
329
```
330
331
```scala
332
val conf = sc.getConf
333
val appName = conf.get("spark.app.name")
334
val executorMemory = conf.get("spark.executor.memory", "1g")
335
```
336
337
### Local Properties
338
339
**setLocalProperty**: Set local property that affects jobs submitted from this thread
340
```scala { .api }
341
def setLocalProperty(key: String, value: String): Unit
342
```
343
344
**getLocalProperty**: Get local property set in this thread
345
```scala { .api }
346
def getLocalProperty(key: String): String
347
```
348
349
```scala
350
// Set properties that will be passed to tasks
351
sc.setLocalProperty("spark.sql.execution.id", "query-123")
352
sc.setLocalProperty("callSite.short", "MyApp.process")
353
354
val value = sc.getLocalProperty("spark.sql.execution.id")
355
```
356
357
### File and JAR Management
358
359
**addFile**: Add a file to be downloaded with this Spark job on every node
360
```scala { .api }
361
def addFile(path: String): Unit
362
def addFile(path: String, recursive: Boolean): Unit
363
```
364
365
**addJar**: Add a JAR dependency for all tasks to be executed on this SparkContext
366
```scala { .api }
367
def addJar(path: String): Unit
368
```
369
370
```scala
371
// Add files that tasks can access via SparkFiles.get()
372
sc.addFile("/path/to/config.properties")
373
sc.addFile("hdfs://path/to/lookup-table.csv")
374
375
// Add JARs for task execution
376
sc.addJar("/path/to/dependencies.jar")
377
sc.addJar("hdfs://path/to/libs/mylib.jar")
378
```
379
380
### Checkpointing
381
382
**setCheckpointDir**: Set directory for RDD checkpointing
383
```scala { .api }
384
def setCheckpointDir(directory: String): Unit
385
```
386
387
```scala
388
sc.setCheckpointDir("hdfs://namenode/checkpoints")
389
390
val data = sc.textFile("large-dataset.txt")
391
val processed = data.map(complexProcessing).filter(isValid)
392
processed.checkpoint() // Checkpoint this RDD
393
```
394
395
## Monitoring and Information
396
397
### Memory and Storage Status
398
399
**getExecutorMemoryStatus**: Get memory status of all executors
400
```scala { .api }
401
def getExecutorMemoryStatus: Map[String, (Long, Long)]
402
```
403
404
**getExecutorStorageStatus**: Get storage status from all executors
405
```scala { .api }
406
def getExecutorStorageStatus: Array[StorageStatus]
407
```
408
409
**getRDDStorageInfo**: Get information about cached/persisted RDDs
410
```scala { .api }
411
def getRDDStorageInfo: Array[RDDInfo]
412
```
413
414
**getPersistentRDDs**: Get all currently persisted RDDs
415
```scala { .api }
416
def getPersistentRDDs: Map[Int, RDD[_]]
417
```
418
419
```scala
420
// Check memory usage across executors
421
val memoryStatus = sc.getExecutorMemoryStatus
422
memoryStatus.foreach { case (executorId, (maxMemory, remainingMemory)) =>
423
println(s"Executor $executorId: ${remainingMemory}/${maxMemory} bytes available")
424
}
425
426
// Check cached RDDs
427
val cachedRDDs = sc.getRDDStorageInfo
428
cachedRDDs.foreach { rddInfo =>
429
println(s"RDD ${rddInfo.id} (${rddInfo.name}): ${rddInfo.memSize} bytes in memory")
430
}
431
```
432
433
### System Properties
434
435
**version**: Get the version of Spark on which this application is running
436
```scala { .api }
437
def version: String
438
```
439
440
**defaultParallelism**: Default level of parallelism for operations
441
```scala { .api }
442
def defaultParallelism: Int
443
```
444
445
**defaultMinPartitions**: Default min number of partitions for Hadoop RDDs
446
```scala { .api }
447
def defaultMinPartitions: Int
448
```
449
450
```scala
451
println(s"Spark Version: ${sc.version}")
452
println(s"Default Parallelism: ${sc.defaultParallelism}")
453
println(s"Default Min Partitions: ${sc.defaultMinPartitions}")
454
```
455
456
### Hadoop Configuration
457
458
**hadoopConfiguration**: Access to Hadoop Configuration for reuse across operations
459
```scala { .api }
460
def hadoopConfiguration: Configuration
461
```
462
463
```scala
464
import org.apache.hadoop.conf.Configuration
465
466
val hadoopConf = sc.hadoopConfiguration
467
hadoopConf.set("fs.s3a.access.key", "your-access-key")
468
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
469
470
// Now S3 operations will use these credentials
471
val s3Data = sc.textFile("s3a://bucket/path/to/file")
472
```
473
474
## SparkContext Lifecycle
475
476
### Stopping SparkContext
477
478
**stop**: Shut down the SparkContext
479
```scala { .api }
480
def stop(): Unit
481
```
482
483
```scala
484
try {
485
val sc = new SparkContext(conf)
486
487
// Perform Spark operations
488
val data = sc.textFile("input.txt")
489
val result = data.map(_.toUpperCase).collect()
490
491
} finally {
492
sc.stop() // Always stop the context
493
}
494
```
495
496
### Best Practices
497
498
1. **Single SparkContext**: Only one SparkContext should be active per JVM
499
2. **Proper Shutdown**: Always call `stop()` to release resources
500
3. **Configuration**: Use SparkConf for all configuration rather than constructor parameters
501
4. **Shared Variables**: Use broadcast variables for large read-only data
502
5. **Accumulators**: Only use accumulators for debugging and monitoring
503
504
```scala
505
// Proper pattern for SparkContext usage
506
val conf = new SparkConf()
507
.setAppName("My Spark Application")
508
.setMaster("local[*]")
509
510
val sc = new SparkContext(conf)
511
512
try {
513
// All Spark operations here
514
515
} finally {
516
sc.stop()
517
}
518
```
519
520
The SparkContext is the foundation of all Spark applications and understanding its API is essential for effective Spark programming.