0
# SparkContext and Configuration
1
2
SparkContext serves as the main entry point to Spark functionality, coordinating with cluster managers and providing methods to create RDDs, manage resources, and control application lifecycle. SparkConf manages application configuration and cluster settings.
3
4
## Capabilities
5
6
### SparkContext Creation and Lifecycle
7
8
SparkContext initialization and termination for application lifecycle management.
9
10
```scala { .api }
11
class SparkContext(config: SparkConf) {
12
def this() = this(new SparkConf())
13
}
14
15
object SparkContext {
16
def getOrCreate(): SparkContext
17
def getOrCreate(conf: SparkConf): SparkContext
18
}
19
20
// Lifecycle methods
21
def stop(): Unit
22
def version: String
23
def applicationId: String
24
def applicationAttemptId: Option[String]
25
```
26
27
**Usage Example:**
28
```scala
29
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
30
val sc = new SparkContext(conf)
31
try {
32
// Application logic
33
} finally {
34
sc.stop()
35
}
36
```
37
38
### RDD Creation
39
40
Methods to create RDDs from various data sources.
41
42
```scala { .api }
43
// From collections
44
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
45
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
46
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
47
def emptyRDD[T: ClassTag]: RDD[T]
48
49
// From files
50
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
51
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
52
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
53
54
// From Hadoop
55
def hadoopRDD[K, V](conf: JobConf, inputFormat: Class[_ <: InputFormat[K, V]],
56
keyClass: Class[K], valueClass: Class[V],
57
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
58
59
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration,
60
fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
61
62
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],
63
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
64
```
65
66
**Usage Example:**
67
```scala
68
// From collection
69
val data = sc.parallelize(1 to 100, 4)
70
71
// Create numeric range RDD
72
val numbers = sc.range(1, 1000000, step = 2, numSlices = 8)
73
74
// Create empty RDD
75
val empty = sc.emptyRDD[String]
76
77
// From text file
78
val lines = sc.textFile("hdfs://path/to/file.txt")
79
80
// Multiple files with their names
81
val files = sc.wholeTextFiles("hdfs://path/to/directory")
82
83
// Binary files as PortableDataStream
84
val binaries = sc.binaryFiles("hdfs://path/to/images/*.jpg")
85
```
86
87
### Shared Variables
88
89
Create broadcast variables and accumulators for efficient cluster-wide data sharing.
90
91
```scala { .api }
92
// Broadcast variables
93
def broadcast[T: ClassTag](value: T): Broadcast[T]
94
95
// Accumulators
96
def longAccumulator(): LongAccumulator
97
def longAccumulator(name: String): LongAccumulator
98
def doubleAccumulator(): DoubleAccumulator
99
def doubleAccumulator(name: String): DoubleAccumulator
100
def collectionAccumulator[T](): CollectionAccumulator[T]
101
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
102
```
103
104
**Usage Example:**
105
```scala
106
// Broadcast variable
107
val lookupTable = Map("key1" -> "value1", "key2" -> "value2")
108
val broadcastVar = sc.broadcast(lookupTable)
109
110
// Long accumulator
111
val errorCount = sc.longAccumulator("Errors")
112
```
113
114
### Configuration Management
115
116
Access and modify Spark configuration during runtime.
117
118
```scala { .api }
119
def getConf: SparkConf
120
def setLocalProperty(key: String, value: String): Unit
121
def getLocalProperty(key: String): String
122
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
123
def clearJobGroup(): Unit
124
```
125
126
### Cluster Resource Management
127
128
Control cluster resources and executor allocation.
129
130
```scala { .api }
131
def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int,
132
hostToLocalTaskCount: Map[String, Int]): Boolean
133
def requestExecutors(numAdditionalExecutors: Int): Boolean
134
def killExecutors(executorIds: Seq[String]): Boolean
135
def killExecutor(executorId: String): Boolean
136
def getExecutorIds: Seq[String]
137
```
138
139
### Job Control
140
141
Monitor and control running jobs.
142
143
```scala { .api }
144
def cancelJob(jobId: Int): Unit
145
def cancelJobGroup(groupId: String): Unit
146
def cancelJobsWithTag(tag: String): Unit
147
def cancelAllJobs(): Unit
148
def setJobDescription(value: String): Unit
149
def statusTracker: SparkStatusTracker
150
```
151
152
## SparkConf Configuration
153
154
### Basic Configuration
155
156
Core application and cluster configuration methods.
157
158
```scala { .api }
159
class SparkConf(loadDefaults: Boolean = true) {
160
def set(key: String, value: String): SparkConf
161
def setAppName(name: String): SparkConf
162
def setMaster(master: String): SparkConf
163
def setJars(jars: Seq[String]): SparkConf
164
def setIfMissing(key: String, value: String): SparkConf
165
}
166
```
167
168
**Usage Example:**
169
```scala
170
val conf = new SparkConf()
171
.setAppName("Data Processing Job")
172
.setMaster("yarn")
173
.set("spark.executor.memory", "4g")
174
.set("spark.executor.cores", "2")
175
.set("spark.sql.adaptive.enabled", "true")
176
```
177
178
### Configuration Access
179
180
Retrieve configuration values and metadata.
181
182
```scala { .api }
183
def get(key: String): String
184
def get(key: String, defaultValue: String): String
185
def getOption(key: String): Option[String]
186
def getAll: Array[(String, String)]
187
def contains(key: String): Boolean
188
def getAppId: String
189
```
190
191
### Configuration Validation
192
193
Validate and clone configuration instances.
194
195
```scala { .api }
196
def clone(): SparkConf
197
def validateSettings(): SparkConf
198
def setAll(settings: Traversable[(String, String)]): SparkConf
199
def remove(key: String): SparkConf
200
```
201
202
## Types
203
204
```scala { .api }
205
// Configuration entry point
206
class SparkConf(loadDefaults: Boolean = true)
207
208
// Main application context
209
class SparkContext(config: SparkConf) extends Logging
210
211
// Status tracking
212
class SparkStatusTracker private[spark] (sc: SparkContext)
213
214
// Accumulator types
215
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long]
216
class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double]
217
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]
218
```