0
# Context Management
1
2
Core functionality for creating and managing Spark applications, including cluster connections, resource allocation, and application lifecycle management.
3
4
## Capabilities
5
6
### SparkContext
7
8
The main entry point for Spark functionality that represents the connection to a Spark cluster and coordinates all Spark operations.
9
10
```scala { .api }
11
/**
12
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
13
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
14
* Only one SparkContext may be active per JVM.
15
*
16
* @param config a Spark Config object describing the application configuration
17
*/
18
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
19
20
/** Create an RDD from a local Scala collection */
21
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
22
23
/** Read a text file from HDFS, local file system, or any Hadoop-supported file system URI */
24
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
25
26
/** Read a directory of text files from HDFS, local file system, or any Hadoop-supported file system URI */
27
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
28
29
/** Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS */
30
def sequenceFile[K, V](path: String,
31
keyClass: Class[K],
32
valueClass: Class[V],
33
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
34
35
/** Load an RDD saved as a SequenceFile containing serialized objects */
36
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
37
38
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
39
def hadoopFile[K, V](path: String,
40
inputFormatClass: Class[_ <: InputFormat[K, V]],
41
keyClass: Class[K],
42
valueClass: Class[V],
43
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
44
45
/** Get RDD for a Hadoop file using the new MapReduce API */
46
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
47
fClass: Class[F],
48
kClass: Class[K],
49
vClass: Class[V],
50
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
51
52
/** Broadcast a read-only variable to all worker nodes */
53
def broadcast[T: ClassTag](value: T): Broadcast[T]
54
55
/** Create an accumulator variable of a given type */
56
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
57
58
/** Create an accumulator variable with a name for display in the Spark UI */
59
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]
60
61
/** Add a file to be downloaded with this Spark job on every node */
62
def addFile(path: String): Unit
63
64
/** Add a JAR dependency for all tasks to be executed on this SparkContext */
65
def addJar(path: String): Unit
66
67
/** Control our logLevel. This overrides any user-defined log settings */
68
def setLogLevel(logLevel: String): Unit
69
70
/** Assign a group ID to all the jobs started by this thread until the group ID is set to a different value */
71
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
72
73
/** Clear the current thread's job group ID and its description */
74
def clearJobGroup(): Unit
75
76
/** Set a local property that affects jobs submitted from this thread */
77
def setLocalProperty(key: String, value: String): Unit
78
79
/** Get a local property set in this thread, or null if it is missing */
80
def getLocalProperty(key: String): String
81
82
/** Set the directory under which RDDs are going to be checkpointed */
83
def setCheckpointDir(directory: String): Unit
84
85
/** Return the directory where checkpoints are stored */
86
def getCheckpointDir: Option[String]
87
88
/** Shut down the SparkContext */
89
def stop(): Unit
90
91
/** Default level of parallelism to use when not given by user */
92
def defaultParallelism: Int
93
94
/** Default min number of partitions for Hadoop RDDs when not given by user */
95
def defaultMinPartitions: Int
96
97
/** The version of Spark on which this application is running */
98
def version: String
99
100
/** Return the Spark configuration */
101
def getConf: SparkConf
102
103
/** The application name */
104
def appName: String
105
106
/** A unique identifier for the Spark application */
107
def applicationId: String
108
109
/** Return the URL of the SparkUI instance started by this SparkContext */
110
def uiWebUrl: Option[String]
111
112
/** Get the status of a Spark job */
113
def statusTracker: SparkStatusTracker
114
}
115
```
116
117
**Usage Examples:**
118
119
```scala
120
import org.apache.spark.{SparkContext, SparkConf}
121
122
// Basic context creation
123
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
124
val sc = new SparkContext(conf)
125
126
// Create RDDs from various sources
127
val numberRDD = sc.parallelize(1 to 1000, 4) // 4 partitions
128
val textRDD = sc.textFile("hdfs://path/to/file.txt")
129
val wholeFilesRDD = sc.wholeTextFiles("hdfs://path/to/directory")
130
131
// Configure job properties
132
sc.setJobGroup("data-processing", "Processing user data")
133
sc.setLocalProperty("spark.sql.adaptive.enabled", "true")
134
135
// Add dependencies
136
sc.addFile("s3://bucket/config.properties")
137
sc.addJar("hdfs://path/to/dependency.jar")
138
139
// Always stop the context
140
sc.stop()
141
```
142
143
### SparkConf
144
145
Configuration object for a Spark application, used to set various Spark parameters as key-value pairs.
146
147
```scala { .api }
148
/**
149
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
150
* Most of the time, you would create a SparkConf object with new SparkConf(), which will load
151
* values from any spark.* Java system properties set in your application as well.
152
*
153
* @param loadDefaults whether to also load values from Java system properties
154
*/
155
class SparkConf(loadDefaults: Boolean = true) extends Cloneable with Logging {
156
157
/** Set a configuration variable */
158
def set(key: String, value: String): SparkConf
159
160
/** Set multiple parameters together */
161
def setAll(settings: Traversable[(String, String)]): SparkConf
162
163
/** Set the master URL to connect to */
164
def setMaster(master: String): SparkConf
165
166
/** Set the application name */
167
def setAppName(name: String): SparkConf
168
169
/** Set JAR files to distribute to the cluster */
170
def setJars(jars: Seq[String]): SparkConf
171
172
/** Set Spark's home directory on worker nodes */
173
def setSparkHome(home: String): SparkConf
174
175
/** Set the location where Spark is installed on worker nodes */
176
def setExecutorEnv(variable: String, value: String): SparkConf
177
178
/** Set multiple environment variables for executors */
179
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
180
181
/** Get a parameter; throws a NoSuchElementException if it's not set */
182
def get(key: String): String
183
184
/** Get a parameter, falling back to a default if not set */
185
def get(key: String, defaultValue: String): String
186
187
/** Get a parameter as an Option */
188
def getOption(key: String): Option[String]
189
190
/** Get all parameters as a list of (key, value) pairs */
191
def getAll: Array[(String, String)]
192
193
/** Remove a parameter from the configuration */
194
def remove(key: String): SparkConf
195
196
/** Does the configuration contain a given parameter? */
197
def contains(key: String): Boolean
198
199
/** Clone this SparkConf */
200
override def clone: SparkConf
201
}
202
```
203
204
**Usage Examples:**
205
206
```scala
207
import org.apache.spark.SparkConf
208
209
// Basic configuration
210
val conf = new SparkConf()
211
.setAppName("Data Processing Job")
212
.setMaster("spark://cluster:7077")
213
.set("spark.executor.memory", "4g")
214
.set("spark.executor.cores", "2")
215
216
// Advanced configuration
217
val advancedConf = new SparkConf()
218
.setAppName("ML Pipeline")
219
.setMaster("yarn")
220
.set("spark.submit.deployMode", "cluster")
221
.set("spark.executor.instances", "10")
222
.set("spark.executor.memory", "8g")
223
.set("spark.executor.cores", "4")
224
.set("spark.sql.adaptive.enabled", "true")
225
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
226
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
227
.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")
228
229
// Configuration with JAR dependencies
230
val confWithJars = new SparkConf()
231
.setAppName("Spark with Dependencies")
232
.setJars(List("s3://bucket/lib1.jar", "hdfs://path/lib2.jar"))
233
.setSparkHome("/opt/spark")
234
235
// Reading configuration values
236
val master = conf.get("spark.master")
237
val appName = conf.get("spark.app.name")
238
val executorMemory = conf.get("spark.executor.memory", "1g") // with default
239
240
// Check if configuration exists
241
if (conf.contains("spark.sql.adaptive.enabled")) {
242
println("Adaptive query execution is configured")
243
}
244
245
// Clone configuration for modifications
246
val devConf = conf.clone().setMaster("local[*]")
247
```
248
249
### SparkStatusTracker
250
251
Interface for monitoring the status of Spark jobs and stages.
252
253
```scala { .api }
254
/**
255
* Low-level status reporting APIs for monitoring job and stage progress.
256
*/
257
class SparkStatusTracker private[spark] (sc: SparkContext) {
258
259
/** Get the list of active job IDs */
260
def getJobIdsForGroup(jobGroup: String): Array[Int]
261
262
/** Get information about a specific job */
263
def getJobInfo(jobId: Int): Option[SparkJobInfo]
264
265
/** Get information about a specific stage */
266
def getStageInfo(stageId: Int): Option[SparkStageInfo]
267
268
/** Get information about all active stages */
269
def getActiveStageInfos(): Array[SparkStageInfo]
270
271
/** Get information about all active jobs */
272
def getActiveJobsInfos(): Array[SparkJobInfo]
273
274
/** Get executor information for all executors */
275
def getExecutorInfos(): Array[SparkExecutorInfo]
276
}
277
278
/** Information about a Spark job */
279
class SparkJobInfo(
280
val jobId: Int,
281
val stageIds: Array[Int],
282
val status: JobExecutionStatus)
283
284
/** Information about a Spark stage */
285
class SparkStageInfo(
286
val stageId: Int,
287
val currentAttemptId: Int,
288
val name: String,
289
val numTasks: Int,
290
val numActiveTasks: Int,
291
val numCompletedTasks: Int,
292
val numFailedTasks: Int)
293
```
294
295
**Usage Examples:**
296
297
```scala
298
val sc = new SparkContext(conf)
299
val statusTracker = sc.statusTracker
300
301
// Monitor active jobs
302
val activeJobs = statusTracker.getActiveJobsInfos()
303
activeJobs.foreach { job =>
304
println(s"Job ${job.jobId}: ${job.status}")
305
}
306
307
// Monitor stages
308
val activeStages = statusTracker.getActiveStageInfos()
309
activeStages.foreach { stage =>
310
val progress = stage.numCompletedTasks.toDouble / stage.numTasks
311
println(s"Stage ${stage.stageId}: ${(progress * 100).toInt}% complete")
312
}
313
314
// Monitor executors
315
val executors = statusTracker.getExecutorInfos()
316
executors.foreach { executor =>
317
println(s"Executor ${executor.executorId}: ${executor.totalCores} cores")
318
}
319
```
320
321
## Configuration Properties
322
323
Key configuration properties for SparkConf:
324
325
### Application Properties
326
- `spark.app.name` - Application name displayed in UI and logs
327
- `spark.master` - Cluster manager to connect to (local, yarn, spark://host:port)
328
- `spark.submit.deployMode` - Deploy mode: client or cluster
329
330
### Runtime Environment
331
- `spark.driver.memory` - Amount of memory for driver process (e.g., "1g", "2048m")
332
- `spark.driver.cores` - Number of cores for driver in cluster mode
333
- `spark.executor.memory` - Amount of memory per executor process
334
- `spark.executor.cores` - Number of cores per executor
335
- `spark.executor.instances` - Number of executor instances (static allocation)
336
337
### Dynamic Allocation
338
- `spark.dynamicAllocation.enabled` - Enable dynamic allocation of executors
339
- `spark.dynamicAllocation.minExecutors` - Minimum number of executors
340
- `spark.dynamicAllocation.maxExecutors` - Maximum number of executors
341
- `spark.dynamicAllocation.initialExecutors` - Initial number of executors
342
343
### Memory Management
344
- `spark.driver.maxResultSize` - Maximum size of results that can be collected to driver
345
- `spark.executor.heartbeatInterval` - Interval between executor heartbeats
346
- `spark.network.timeout` - Default timeout for network interactions
347
348
### Spark UI
349
- `spark.ui.enabled` - Whether to run the web UI (default: true)
350
- `spark.ui.port` - Port for web UI (default: 4040)
351
- `spark.eventLog.enabled` - Whether to log events for replay in UI
352
- `spark.eventLog.dir` - Directory for event logs
353
354
## Error Handling
355
356
Common exceptions thrown by SparkContext operations:
357
358
- `SparkException` - General Spark-related errors
359
- `IllegalStateException` - When trying to use stopped SparkContext
360
- `IllegalArgumentException` - Invalid configuration parameters
361
- `FileNotFoundException` - When input files don't exist
362
- `OutOfMemoryError` - Insufficient memory for operations