0
# Core SparkContext and Configuration
1
2
The SparkContext and SparkConf classes provide the foundation for all Spark applications, handling cluster connections, resource management, and application configuration.
3
4
## SparkContext
5
6
The main entry point for Spark functionality that represents the connection to a Spark cluster.
7
8
### Constructors
9
10
```scala { .api }
11
class SparkContext(config: SparkConf)
12
class SparkContext() // Load from system properties
13
class SparkContext(master: String, appName: String, conf: SparkConf)
14
class SparkContext(
15
master: String,
16
appName: String,
17
sparkHome: String,
18
jars: Seq[String],
19
environment: Map[String, String]
20
)
21
```
22
23
### Configuration & Context Management
24
25
```scala { .api }
26
class SparkContext {
27
def getConf: SparkConf
28
def master: String
29
def appName: String
30
def applicationId: String
31
def version: String
32
def isLocal: Boolean
33
def isStopped: Boolean
34
def stop(): Unit
35
def defaultParallelism: Int
36
}
37
```
38
39
### RDD Creation Methods
40
41
```scala { .api }
42
class SparkContext {
43
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
44
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
45
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] // With locality preferences
46
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
47
def emptyRDD[T: ClassTag]: RDD[T]
48
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
49
}
50
```
51
52
### File I/O Methods
53
54
```scala { .api }
55
class SparkContext {
56
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
57
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
58
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
59
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]
60
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
61
def sequenceFile[K, V](
62
path: String,
63
keyClass: Class[K],
64
valueClass: Class[V],
65
minPartitions: Int = defaultMinPartitions
66
): RDD[(K, V)]
67
}
68
```
69
70
### Hadoop Integration Methods
71
72
```scala { .api }
73
class SparkContext {
74
def hadoopRDD[K, V](
75
conf: JobConf,
76
inputFormatClass: Class[_ <: InputFormat[K, V]],
77
keyClass: Class[K],
78
valueClass: Class[V],
79
minPartitions: Int = defaultMinPartitions
80
): RDD[(K, V)]
81
82
def hadoopFile[K, V](
83
path: String,
84
inputFormatClass: Class[_ <: InputFormat[K, V]],
85
keyClass: Class[K],
86
valueClass: Class[V],
87
minPartitions: Int = defaultMinPartitions
88
): RDD[(K, V)]
89
90
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
91
path: String,
92
fClass: Class[F],
93
kClass: Class[K],
94
vClass: Class[V],
95
conf: Configuration = hadoopConfiguration
96
): RDD[(K, V)]
97
98
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
99
conf: Configuration,
100
fClass: Class[F],
101
kClass: Class[K],
102
vClass: Class[V]
103
): RDD[(K, V)]
104
}
105
```
106
107
### Job Control
108
109
```scala { .api }
110
class SparkContext {
111
def runJob[T, U: ClassTag](
112
rdd: RDD[T],
113
func: (TaskContext, Iterator[T]) => U,
114
partitions: Seq[Int],
115
resultHandler: (Int, U) => Unit
116
): Unit
117
118
def runJob[T, U: ClassTag](
119
rdd: RDD[T],
120
func: (TaskContext, Iterator[T]) => U,
121
partitions: Seq[Int]
122
): Array[U]
123
124
def runJob[T, U: ClassTag](
125
rdd: RDD[T],
126
func: Iterator[T] => U,
127
partitions: Seq[Int]
128
): Array[U]
129
130
def runJob[T, U: ClassTag](
131
rdd: RDD[T],
132
func: (TaskContext, Iterator[T]) => U
133
): Array[U]
134
135
def runApproximateJob[T, U, R](
136
rdd: RDD[T],
137
func: (TaskContext, Iterator[T]) => U,
138
evaluator: ApproximateEvaluator[U, R],
139
timeout: Long
140
): PartialResult[R]
141
142
def submitJob[T, U, R](
143
rdd: RDD[T],
144
processPartition: Iterator[T] => U,
145
partitions: Seq[Int],
146
resultHandler: (Int, U) => Unit,
147
resultFunc: => R
148
): SimpleFutureAction[R]
149
150
def cancelJobGroup(groupId: String): Unit
151
def cancelAllJobs(): Unit
152
def cancelJob(jobId: Int): Unit
153
def cancelStage(stageId: Int): Unit
154
}
155
```
156
157
### Thread-Local Properties & Job Management
158
159
```scala { .api }
160
class SparkContext {
161
def setLocalProperty(key: String, value: String): Unit
162
def getLocalProperty(key: String): String
163
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
164
def clearJobGroup(): Unit
165
def setJobDescription(value: String): Unit
166
}
167
```
168
169
### Storage & Persistence Information
170
171
```scala { .api }
172
class SparkContext {
173
def getPersistentRDDs: Map[Int, RDD[_]]
174
def getRDDStorageInfo: Array[RDDInfo]
175
def getExecutorMemoryStatus: Map[String, (Long, Long)]
176
}
177
```
178
179
### Files & JARs
180
181
```scala { .api }
182
class SparkContext {
183
def addFile(path: String): Unit
184
def addFile(path: String, recursive: Boolean): Unit
185
def listFiles(): Seq[String]
186
def addJar(path: String): Unit
187
def listJars(): Seq[String]
188
}
189
```
190
191
### Checkpointing
192
193
```scala { .api }
194
class SparkContext {
195
def setCheckpointDir(directory: String): Unit
196
def getCheckpointFile: Option[String]
197
}
198
```
199
200
### Event Listeners
201
202
```scala { .api }
203
class SparkContext {
204
def addSparkListener(listener: SparkListenerInterface): Unit
205
def removeSparkListener(listener: SparkListenerInterface): Unit
206
}
207
```
208
209
### Utility Methods
210
211
```scala { .api }
212
class SparkContext {
213
def setLogLevel(logLevel: String): Unit
214
}
215
```
216
217
## SparkConf
218
219
Configuration for a Spark application with key-value pairs for various Spark parameters.
220
221
### Constructors
222
223
```scala { .api }
224
class SparkConf(loadDefaults: Boolean = true)
225
```
226
227
### Configuration Setting
228
229
```scala { .api }
230
class SparkConf {
231
def set(key: String, value: String): SparkConf
232
def setMaster(master: String): SparkConf
233
def setAppName(name: String): SparkConf
234
def setJars(jars: Seq[String]): SparkConf
235
def setJars(jars: Array[String]): SparkConf
236
def setExecutorEnv(variable: String, value: String): SparkConf
237
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
238
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
239
def setSparkHome(home: String): SparkConf
240
def setAll(settings: Traversable[(String, String)]): SparkConf
241
def setIfMissing(key: String, value: String): SparkConf
242
}
243
```
244
245
### Configuration Retrieval
246
247
```scala { .api }
248
class SparkConf {
249
def get(key: String): String
250
def get(key: String, defaultValue: String): String
251
def getOption(key: String): Option[String]
252
def getAll: Array[(String, String)]
253
def getAllWithPrefix(prefix: String): Array[(String, String)]
254
def getInt(key: String, defaultValue: Int): Int
255
def getLong(key: String, defaultValue: Long): Long
256
def getDouble(key: String, defaultValue: Double): Double
257
def getBoolean(key: String, defaultValue: Boolean): Boolean
258
def getExecutorEnv: Seq[(String, String)]
259
def getAppId: String
260
}
261
```
262
263
### Time & Size Configuration
264
265
```scala { .api }
266
class SparkConf {
267
def getTimeAsSeconds(key: String): Long
268
def getTimeAsSeconds(key: String, defaultValue: String): Long
269
def getTimeAsMs(key: String): Long
270
def getTimeAsMs(key: String, defaultValue: String): Long
271
def getSizeAsBytes(key: String): Long
272
def getSizeAsBytes(key: String, defaultValue: String): Long
273
def getSizeAsBytes(key: String, defaultValue: Long): Long
274
def getSizeAsKb(key: String): Long
275
def getSizeAsMb(key: String): Long
276
def getSizeAsGb(key: String): Long
277
}
278
```
279
280
### Kryo Serialization
281
282
```scala { .api }
283
class SparkConf {
284
def registerKryoClasses(classes: Array[Class[_]]): SparkConf
285
def registerAvroSchemas(schemas: Schema*): SparkConf
286
def getAvroSchema: Map[Long, String]
287
}
288
```
289
290
### Configuration Management
291
292
```scala { .api }
293
class SparkConf {
294
def remove(key: String): SparkConf
295
def contains(key: String): Boolean
296
def clone: SparkConf
297
def toDebugString: String
298
}
299
```
300
301
## Usage Examples
302
303
### Basic SparkContext Setup
304
305
```scala
306
import org.apache.spark.{SparkContext, SparkConf}
307
308
val conf = new SparkConf()
309
.setAppName("My Application")
310
.setMaster("local[4]")
311
.set("spark.executor.memory", "2g")
312
313
val sc = new SparkContext(conf)
314
315
// Use SparkContext
316
val rdd = sc.parallelize(1 to 100)
317
318
// Always stop the context
319
sc.stop()
320
```
321
322
### Advanced Configuration
323
324
```scala
325
val conf = new SparkConf()
326
.setAppName("Advanced App")
327
.setMaster("yarn")
328
.set("spark.executor.instances", "10")
329
.set("spark.executor.memory", "4g")
330
.set("spark.executor.cores", "2")
331
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
332
.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-oracle")
333
.registerKryoClasses(Array(classOf[MyCustomClass]))
334
335
val sc = new SparkContext(conf)
336
```
337
338
### File Input/Output
339
340
```scala
341
// Read text files
342
val textRDD = sc.textFile("hdfs://path/to/input")
343
344
// Read whole text files (filename, content)
345
val wholeFilesRDD = sc.wholeTextFiles("hdfs://path/to/directory")
346
347
// Read binary files
348
val binaryRDD = sc.binaryFiles("hdfs://path/to/binary")
349
350
// Read Hadoop SequenceFile
351
val sequenceRDD = sc.sequenceFile[String, Int](
352
"hdfs://path/to/sequence",
353
classOf[String],
354
classOf[Int]
355
)
356
```
357
358
### Job Management
359
360
```scala
361
// Set job group for related jobs
362
sc.setJobGroup("group1", "Processing user data", interruptOnCancel = true)
363
364
// Set job description
365
sc.setJobDescription("Computing user statistics")
366
367
// Execute jobs
368
val result1 = rdd1.collect()
369
val result2 = rdd2.count()
370
371
// Cancel all jobs in group
372
sc.cancelJobGroup("group1")
373
```
374
375
## Configuration Properties
376
377
### Core Application Settings
378
379
- `spark.app.name` - Application name
380
- `spark.master` - Master URL (local[*], yarn, spark://host:port)
381
- `spark.executor.memory` - Memory per executor (e.g., "1g", "2g")
382
- `spark.executor.cores` - CPU cores per executor
383
- `spark.executor.instances` - Number of executors (for YARN/Kubernetes)
384
- `spark.driver.memory` - Driver memory (e.g., "1g")
385
- `spark.driver.cores` - Driver CPU cores
386
387
### Performance Tuning
388
389
- `spark.default.parallelism` - Default number of partitions
390
- `spark.serializer` - Serializer class (KryoSerializer recommended)
391
- `spark.kryo.registrationRequired` - Require Kryo class registration
392
- `spark.rdd.compress` - Compress RDD partitions in memory
393
- `spark.broadcast.compress` - Compress broadcast variables
394
395
### Memory Management
396
397
- `spark.memory.fraction` - Fraction of heap space for execution and storage
398
- `spark.memory.storageFraction` - Fraction of memory for storage
399
- `spark.memory.offHeap.enabled` - Use off-heap memory
400
- `spark.memory.offHeap.size` - Off-heap memory size
401
402
## Important Notes
403
404
- Only one SparkContext can be active per JVM at a time
405
- Always call `stop()` on SparkContext before creating a new one
406
- SparkContext is not thread-safe for modifications
407
- Configuration changes cannot be made after SparkContext creation
408
- Use `setIfMissing()` to provide default values without overriding existing settings