0
# SparkContext and Configuration
1
2
Core entry point for Spark applications providing configuration management, resource coordination, and the foundation for all Spark operations.
3
4
## Capabilities
5
6
### SparkContext
7
8
Main entry point for Spark functionality representing the connection to a Spark cluster. Used to create RDDs, accumulators, and broadcast variables.
9
10
```scala { .api }
11
/**
12
* Main entry point for Spark functionality
13
* @param config SparkConf object describing application configuration
14
*/
15
class SparkContext(config: SparkConf) extends ExecutorAllocationClient
16
17
// Core RDD creation methods
18
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
19
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
20
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
21
def emptyRDD[T: ClassTag]: RDD[T]
22
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
23
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
24
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]
25
26
// File-based RDD creation
27
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
28
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
29
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
30
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]
31
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
32
33
// Hadoop integration
34
def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]],
35
keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
36
def hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]],
37
keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
38
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, fClass: Class[F],
39
kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
40
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,
41
fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
42
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],
43
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
44
def hadoopConfiguration: Configuration
45
46
// Shared variables
47
def broadcast[T: ClassTag](value: T): Broadcast[T]
48
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
49
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]
50
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]
51
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]
52
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T](initialValue: R): Accumulable[R, T]
53
54
// Resource management
55
def stop(): Unit
56
def setCheckpointDir(directory: String): Unit
57
def getCheckpointDir: Option[String]
58
def requestExecutors(numAdditionalExecutors: Int): Boolean
59
def killExecutors(executorIds: Seq[String]): Boolean
60
def killExecutor(executorId: String): Boolean
61
62
// Job execution and management
63
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit
64
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U]
65
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U]
66
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
67
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
68
def submitJob[T, U, R](rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int],
69
resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R]
70
def runApproximateJob[T, U, R](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
71
evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R]
72
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
73
def setJobDescription(value: String): Unit
74
def clearJobGroup(): Unit
75
def cancelJobGroup(groupId: String): Unit
76
def cancelAllJobs(): Unit
77
78
// Files and JARs
79
def addFile(path: String): Unit
80
def addFile(path: String, recursive: Boolean): Unit
81
def addJar(path: String): Unit
82
def setLocalProperty(key: String, value: String): Unit
83
def getLocalProperty(key: String): String
84
85
// Status and monitoring
86
def statusTracker: SparkStatusTracker
87
def getExecutorMemoryStatus: Map[String, (Long, Long)]
88
def getExecutorStorageStatus: Array[StorageStatus]
89
def getRDDStorageInfo: Array[RDDInfo]
90
def getPersistentRDDs: Map[Int, RDD[_]]
91
def getAllPools: Seq[Schedulable]
92
def getPoolForName(pool: String): Option[Schedulable]
93
def getSchedulingMode: SchedulingMode.SchedulingMode
94
def metricsSystem: MetricsSystem
95
def addSparkListener(listener: SparkListener): Unit
96
97
// Configuration and context properties
98
def getConf: SparkConf
99
def setLogLevel(logLevel: String): Unit
100
def master: String
101
def appName: String
102
def applicationId: String
103
def applicationAttemptId: Option[String]
104
def version: String
105
def startTime: Long
106
def isLocal: Boolean
107
def isStopped: Boolean
108
def defaultParallelism: Int
109
def defaultMinPartitions: Int
110
111
// Call site management
112
def setCallSite(shortCallSite: String): Unit
113
def clearCallSite(): Unit
114
```
115
116
### SparkContext Companion Object
117
118
Static methods for SparkContext management and utility functions.
119
120
```scala { .api }
121
object SparkContext {
122
/**
123
* Get existing SparkContext or create a new one with provided configuration
124
*/
125
def getOrCreate(config: SparkConf): SparkContext
126
def getOrCreate(): SparkContext
127
128
/**
129
* Find JAR containing the given class
130
*/
131
def jarOfClass(cls: Class[_]): Option[String]
132
def jarOfObject(obj: AnyRef): Option[String]
133
}
134
```
135
136
**Usage Examples:**
137
138
```scala
139
import org.apache.spark.{SparkContext, SparkConf}
140
141
// Basic SparkContext creation
142
val conf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
143
val sc = new SparkContext(conf)
144
145
// Alternative constructor
146
val sc2 = new SparkContext("local[*]", "MyApp")
147
148
// Create RDDs from collections
149
val data = List(1, 2, 3, 4, 5)
150
val rdd = sc.parallelize(data)
151
val rdd2 = sc.parallelize(data, numSlices = 4) // specify partitions
152
153
// Create RDDs from files
154
val textRDD = sc.textFile("hdfs://path/to/file.txt")
155
val wholeFiles = sc.wholeTextFiles("hdfs://path/to/directory")
156
157
// Resource management
158
sc.setCheckpointDir("hdfs://path/to/checkpoint")
159
sc.addFile("path/to/config.properties")
160
sc.addJar("path/to/dependency.jar")
161
162
// Job management
163
sc.setJobGroup("data-processing", "Processing user data", interruptOnCancel = true)
164
// ... run some jobs
165
sc.clearJobGroup()
166
167
// Clean shutdown
168
sc.stop()
169
```
170
171
### SparkConf
172
173
Configuration object for Spark applications containing key-value settings that control Spark's behavior.
174
175
```scala { .api }
176
/**
177
* Configuration for a Spark application
178
*/
179
class SparkConf(loadDefaults: Boolean = true)
180
181
// Configuration setting methods
182
def set(key: String, value: String): SparkConf
183
def setMaster(master: String): SparkConf
184
def setAppName(name: String): SparkConf
185
def setJars(jars: Seq[String]): SparkConf
186
def setJars(jars: Array[String]): SparkConf
187
def setExecutorEnv(variable: String, value: String): SparkConf
188
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
189
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
190
191
// Configuration retrieval methods
192
def get(key: String): String
193
def get(key: String, defaultValue: String): String
194
def getOption(key: String): Option[String]
195
def getInt(key: String, defaultValue: Int): Int
196
def getLong(key: String, defaultValue: Long): Long
197
def getDouble(key: String, defaultValue: Double): Double
198
def getBoolean(key: String, defaultValue: Boolean): Boolean
199
def getSizeAsBytes(key: String, defaultValue: String): Long
200
def getSizeAsKb(key: String, defaultValue: String): Long
201
def getSizeAsMb(key: String, defaultValue: String): Long
202
def getSizeAsGb(key: String, defaultValue: String): Long
203
def getTimeAsMs(key: String, defaultValue: String): Long
204
def getTimeAsSeconds(key: String, defaultValue: String): Long
205
206
// Configuration introspection
207
def contains(key: String): Boolean
208
def getAll: Array[(String, String)]
209
def remove(key: String): SparkConf
210
def clone(): SparkConf
211
212
// Validation
213
def validateSettings(): SparkConf
214
```
215
216
**Usage Examples:**
217
218
```scala
219
import org.apache.spark.SparkConf
220
221
// Basic configuration
222
val conf = new SparkConf()
223
.setAppName("Data Processing App")
224
.setMaster("spark://master:7077")
225
.set("spark.executor.memory", "2g")
226
.set("spark.executor.cores", "4")
227
228
// Advanced configuration
229
val advancedConf = new SparkConf()
230
.setAppName("Advanced App")
231
.setMaster("yarn")
232
.setJars(Array("path/to/app.jar", "path/to/dependency.jar"))
233
.setExecutorEnv("PYTHON_PATH", "/opt/python")
234
.set("spark.sql.adaptive.enabled", "true")
235
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
236
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
237
238
// Reading configuration values
239
val appName = conf.get("spark.app.name")
240
val executorMemory = conf.get("spark.executor.memory", "1g")
241
val executorCores = conf.getInt("spark.executor.cores", 2)
242
val isAdaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)
243
244
// Configuration validation and introspection
245
conf.validateSettings()
246
val allSettings = conf.getAll
247
println(s"All settings: ${allSettings.mkString(", ")}")
248
```
249
250
### SparkFiles
251
252
Utility object for resolving paths to files added through SparkContext.addFile().
253
254
```scala { .api }
255
/**
256
* Resolves paths to files added through SparkContext.addFile()
257
*/
258
object SparkFiles {
259
/**
260
* Get the absolute path of a file added through SparkContext.addFile()
261
* @param filename name of the file
262
* @return absolute path to the file
263
*/
264
def get(filename: String): String
265
266
/**
267
* Get the root directory that contains files added through SparkContext.addFile()
268
* @return path to the root directory
269
*/
270
def getRootDirectory(): String
271
}
272
```
273
274
### SparkStatusTracker
275
276
Low-level status reporting APIs for monitoring job and stage progress.
277
278
```scala { .api }
279
/**
280
* Low-level status reporting APIs for monitoring job and stage progress
281
*/
282
class SparkStatusTracker(sc: SparkContext) {
283
/**
284
* Return a list of all known jobs in a particular job group
285
*/
286
def getJobIdsForGroup(jobGroup: String): Array[Int]
287
288
/**
289
* Returns an array containing the IDs of all active jobs
290
*/
291
def getActiveJobIds(): Array[Int]
292
293
/**
294
* Returns an array containing the IDs of all active stages
295
*/
296
def getActiveStageIds(): Array[Int]
297
298
/**
299
* Returns stage information, or None if the stage info is not available
300
*/
301
def getStageInfo(stageId: Int): Option[SparkStageInfo]
302
303
/**
304
* Returns information of all executors known to this SparkContext
305
*/
306
def getExecutorInfos(): Array[SparkExecutorInfo]
307
}
308
```
309
310
## Error Handling
311
312
Common exceptions that may be thrown:
313
314
- **SparkException**: General Spark-related errors
315
- **IllegalArgumentException**: Invalid configuration parameters
316
- **IllegalStateException**: Attempting operations on stopped SparkContext
317
- **UnsupportedOperationException**: Unsupported operations in certain deployment modes
318
319
**Example Error Handling:**
320
321
```scala
322
import org.apache.spark.{SparkContext, SparkConf, SparkException}
323
324
try {
325
val conf = new SparkConf().setAppName("MyApp")
326
val sc = new SparkContext(conf)
327
328
// Your Spark operations here
329
330
} catch {
331
case e: SparkException =>
332
println(s"Spark error: ${e.getMessage}")
333
case e: IllegalArgumentException =>
334
println(s"Invalid configuration: ${e.getMessage}")
335
case e: Exception =>
336
println(s"Unexpected error: ${e.getMessage}")
337
} finally {
338
if (sc != null && !sc.isStopped) {
339
sc.stop()
340
}
341
}
342
```