0
# Application Context
1
2
Application setup and cluster connection management through SparkContext and SparkConf for coordinating distributed Spark applications.
3
4
## Capabilities
5
6
### SparkContext
7
8
Main entry point for Spark functionality, representing the connection to a Spark cluster. Only one SparkContext should be active per JVM.
9
10
```scala { .api }
11
/**
12
* Main entry point for Spark functionality
13
* @param config Spark configuration object
14
*/
15
class SparkContext(config: SparkConf) {
16
/** Create RDD from Scala collection */
17
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
18
19
/** Read text files from HDFS, local filesystem, or any Hadoop-supported URI */
20
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
21
22
/** Create RDD from Hadoop InputFormat */
23
def hadoopRDD[K, V](
24
conf: JobConf,
25
inputFormatClass: Class[_ <: InputFormat[K, V]],
26
keyClass: Class[K],
27
valueClass: Class[V],
28
minPartitions: Int = defaultMinPartitions
29
): RDD[(K, V)]
30
31
/** Create RDD from new Hadoop API InputFormat */
32
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
33
conf: Configuration,
34
fClass: Class[F],
35
kClass: Class[K],
36
vClass: Class[V]
37
): RDD[(K, V)]
38
39
/** Create broadcast variable */
40
def broadcast[T: ClassTag](value: T): Broadcast[T]
41
42
/** Create accumulator */
43
def longAccumulator(name: String): LongAccumulator
44
def doubleAccumulator(name: String): DoubleAccumulator
45
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
46
47
/** Add file to be downloaded with this Spark job */
48
def addFile(path: String, recursive: Boolean = false): Unit
49
50
/** Add JAR file to be distributed to cluster */
51
def addJar(path: String): Unit
52
53
/** Set checkpoint directory for RDD checkpointing */
54
def setCheckpointDir(directory: String): Unit
55
56
/** Stop SparkContext and release resources */
57
def stop(): Unit
58
59
/** Get current status tracker */
60
def statusTracker: SparkStatusTracker
61
62
/** Default parallelism level */
63
def defaultParallelism: Int
64
65
/** Spark version */
66
def version: String
67
}
68
```
69
70
**Usage Examples:**
71
72
```scala
73
import org.apache.spark.{SparkContext, SparkConf}
74
75
// Basic setup
76
val conf = new SparkConf()
77
.setAppName("MyApp")
78
.setMaster("local[2]")
79
val sc = new SparkContext(conf)
80
81
// Create RDD from collection
82
val numbers = sc.parallelize(1 to 100)
83
84
// Read text file
85
val lines = sc.textFile("hdfs://data/input.txt")
86
87
// Add dependencies
88
sc.addFile("config.properties")
89
sc.addJar("lib/custom.jar")
90
91
// Cleanup
92
sc.stop()
93
```
94
95
### SparkConf
96
97
Configuration for Spark applications with key-value pairs for runtime settings.
98
99
```scala { .api }
100
/**
101
* Configuration for Spark applications
102
* @param loadDefaults whether to load default configurations
103
*/
104
class SparkConf(loadDefaults: Boolean = true) {
105
/** Set configuration property */
106
def set(key: String, value: String): SparkConf
107
108
/** Set application name */
109
def setAppName(name: String): SparkConf
110
111
/** Set master URL */
112
def setMaster(master: String): SparkConf
113
114
/** Set Spark home directory */
115
def setSparkHome(home: String): SparkConf
116
117
/** Set JAR files to distribute to cluster */
118
def setJars(jars: Seq[String]): SparkConf
119
120
/** Set executor memory */
121
def set(key: String, value: String): SparkConf
122
123
/** Get configuration value */
124
def get(key: String): String
125
def get(key: String, defaultValue: String): String
126
127
/** Get all configuration as key-value pairs */
128
def getAll: Array[(String, String)]
129
130
/** Check if configuration contains key */
131
def contains(key: String): Boolean
132
133
/** Remove configuration property */
134
def remove(key: String): SparkConf
135
136
/** Clone configuration */
137
def clone(): SparkConf
138
}
139
```
140
141
**Usage Examples:**
142
143
```scala
144
val conf = new SparkConf()
145
.setAppName("Data Processing")
146
.setMaster("yarn")
147
.set("spark.executor.memory", "2g")
148
.set("spark.executor.cores", "4")
149
.set("spark.sql.adaptive.enabled", "true")
150
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
151
152
// Advanced configuration
153
conf.set("spark.hadoop.fs.s3a.access.key", accessKey)
154
.set("spark.hadoop.fs.s3a.secret.key", secretKey)
155
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
156
```
157
158
### SparkStatusTracker
159
160
Low-level status API for monitoring Spark applications and job execution.
161
162
```scala { .api }
163
/**
164
* Low-level status tracking API
165
*/
166
class SparkStatusTracker {
167
/** Get IDs of all known jobs */
168
def getJobIdsForGroup(jobGroup: String): Array[Int]
169
170
/** Get information for specific job */
171
def getJobInfo(jobId: Int): Option[SparkJobInfo]
172
173
/** Get information for specific stage */
174
def getStageInfo(stageId: Int): Option[SparkStageInfo]
175
176
/** Get active job IDs */
177
def getActiveJobIds(): Array[Int]
178
179
/** Get active stage IDs */
180
def getActiveStageIds(): Array[Int]
181
182
/** Get executor information */
183
def getExecutorInfos(): Array[SparkExecutorInfo]
184
}
185
186
case class SparkJobInfo(
187
jobId: Int,
188
stageIds: Array[Int],
189
status: JobExecutionStatus
190
)
191
192
case class SparkStageInfo(
193
stageId: Int,
194
currentAttemptId: Int,
195
name: String,
196
numTasks: Int,
197
numActiveTasks: Int,
198
numCompleteTasks: Int,
199
numFailedTasks: Int
200
)
201
```
202
203
### SparkFiles
204
205
Utility for resolving paths to files added through SparkContext.addFile().
206
207
```scala { .api }
208
/**
209
* Resolves paths to files added through SparkContext.addFile()
210
*/
211
object SparkFiles {
212
/** Get local path to file added with SparkContext.addFile() */
213
def get(filename: String): String
214
215
/** Get root directory containing files added with SparkContext.addFile() */
216
def getRootDirectory(): String
217
}
218
```
219
220
## Exception Handling
221
222
```scala { .api }
223
class SparkException(message: String, cause: Throwable = null)
224
extends Exception(message, cause)
225
226
class TaskNotSerializableException(taskName: String, cause: NotSerializableException)
227
extends SparkException(s"Task not serializable: $taskName", cause)
228
229
class SparkFileAlreadyExistsException(outputPath: String)
230
extends SparkException(s"Output path $outputPath already exists")
231
```
232
233
Common exceptions include serialization errors when tasks contain non-serializable closures, and file system errors when output paths already exist.