0
# Context and Configuration
1
2
This module provides the primary entry points for creating and configuring Spark applications. The SparkContext serves as the main coordination point for distributed computation, while SparkConf handles all configuration parameters.
3
4
## SparkConf
5
6
SparkConf manages all configuration parameters for a Spark application.
7
8
```scala { .api }
9
class SparkConf(loadDefaults: Boolean = true) {
10
def set(key: String, value: String): SparkConf
11
def setMaster(master: String): SparkConf
12
def setAppName(name: String): SparkConf
13
def setJars(jars: Seq[String]): SparkConf
14
def setExecutorEnv(variable: String, value: String): SparkConf
15
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
16
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
17
def get(key: String): String
18
def get(key: String, defaultValue: String): String
19
def getOption(key: String): Option[String]
20
def getAll: Array[(String, String)]
21
def remove(key: String): SparkConf
22
def contains(key: String): Boolean
23
def clone(): SparkConf
24
def setSparkHome(home: String): SparkConf
25
def setIfMissing(key: String, value: String): SparkConf
26
}
27
```
28
29
### Usage Examples
30
31
```scala
32
import org.apache.spark.SparkConf
33
34
// Basic configuration
35
val conf = new SparkConf()
36
.setAppName("My Spark Application")
37
.setMaster("local[4]")
38
.set("spark.sql.adaptive.enabled", "true")
39
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
40
41
// Configure executor settings
42
conf.set("spark.executor.memory", "4g")
43
.set("spark.executor.cores", "2")
44
.set("spark.executor.instances", "10")
45
46
// Set environment variables for executors
47
conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")
48
.setExecutorEnv(Seq(
49
("SPARK_LOCAL_DIRS", "/tmp/spark"),
50
("SPARK_WORKER_DIR", "/tmp/spark-worker")
51
))
52
53
// Conditional configuration
54
if (!conf.contains("spark.master")) {
55
conf.setMaster("local[*]")
56
}
57
58
// Clone configuration for different contexts
59
val testConf = conf.clone()
60
.setAppName("Test Application")
61
.set("spark.sql.execution.arrow.pyspark.enabled", "false")
62
```
63
64
## SparkContext
65
66
SparkContext is the main entry point for Spark functionality. Only one SparkContext should be active per JVM.
67
68
```scala { .api }
69
class SparkContext(config: SparkConf) {
70
// RDD Creation
71
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
72
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
73
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
74
def emptyRDD[T: ClassTag]: RDD[T]
75
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
76
77
// File Input
78
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
79
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
80
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
81
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
82
def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
83
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
84
85
// Broadcast and Accumulators
86
def broadcast[T: ClassTag](value: T): Broadcast[T]
87
def longAccumulator(): LongAccumulator
88
def longAccumulator(name: String): LongAccumulator
89
def doubleAccumulator(): DoubleAccumulator
90
def doubleAccumulator(name: String): DoubleAccumulator
91
def collectionAccumulator[T](): CollectionAccumulator[T]
92
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
93
94
// Application Control
95
def stop(): Unit
96
def addFile(path: String): Unit
97
def addFile(path: String, recursive: Boolean): Unit
98
def addJar(path: String): Unit
99
def clearFiles(): Unit
100
def clearJars(): Unit
101
102
// Configuration and Environment
103
def setLogLevel(logLevel: String): Unit
104
def setLocalProperty(key: String, value: String): Unit
105
def getLocalProperty(key: String): String
106
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
107
def clearJobGroup(): Unit
108
def setJobDescription(value: String): Unit
109
110
// Dynamic Resource Allocation
111
def requestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int = 0, hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean
112
def requestExecutors(numAdditionalExecutors: Int): Boolean
113
def killExecutors(executorIds: Seq[String]): Boolean
114
def killExecutor(executorId: String): Boolean
115
116
// Properties
117
def master: String
118
def appName: String
119
def jars: Seq[String]
120
def files: Seq[String]
121
def startTime: Long
122
def version: String
123
def defaultParallelism: Int
124
def defaultMinPartitions: Int
125
def conf: SparkConf
126
def statusTracker: SparkStatusTracker
127
}
128
```
129
130
### Usage Examples
131
132
```scala
133
import org.apache.spark.{SparkContext, SparkConf}
134
import org.apache.spark.storage.StorageLevel
135
136
// Create configuration and context
137
val conf = new SparkConf()
138
.setAppName("Data Processing Pipeline")
139
.setMaster("spark://master:7077")
140
141
val sc = new SparkContext(conf)
142
143
// Create RDDs from various sources
144
val numbers = sc.parallelize(1 to 1000000, numSlices = 100)
145
val lines = sc.textFile("hdfs://data/input.txt", minPartitions = 50)
146
val keyValueData = sc.sequenceFile[String, Int]("hdfs://data/sequence")
147
148
// Set job properties for monitoring
149
sc.setJobGroup("data-processing", "Main data processing pipeline")
150
sc.setJobDescription("Processing customer transaction data")
151
152
// Add application files and JARs
153
sc.addFile("hdfs://shared/config.properties")
154
sc.addJar("s3://libs/custom-transformations.jar")
155
156
// Create broadcast variables for lookup tables
157
val lookupTable = Map("A" -> 1, "B" -> 2, "C" -> 3)
158
val broadcastLookup = sc.broadcast(lookupTable)
159
160
// Create accumulators for metrics
161
val errorCount = sc.longAccumulator("Processing Errors")
162
val processingTime = sc.doubleAccumulator("Total Processing Time")
163
164
// Dynamic resource management
165
if (sc.defaultParallelism < 100) {
166
sc.requestExecutors(20)
167
}
168
169
// Process data using broadcast and accumulators
170
val results = lines.map { line =>
171
try {
172
val startTime = System.currentTimeMillis()
173
val processed = processLine(line, broadcastLookup.value)
174
processingTime.add(System.currentTimeMillis() - startTime)
175
processed
176
} catch {
177
case e: Exception =>
178
errorCount.add(1)
179
throw e
180
}
181
}
182
183
// Persist intermediate results
184
results.persist(StorageLevel.MEMORY_AND_DISK_SER)
185
186
// Trigger computation and collect metrics
187
val finalCount = results.count()
188
println(s"Processed $finalCount records")
189
println(s"Error count: ${errorCount.value}")
190
println(s"Average processing time: ${processingTime.value / finalCount}ms")
191
192
// Clean up
193
sc.stop()
194
```
195
196
## Package Constants
197
198
Access to Spark version and build information.
199
200
```scala { .api }
201
// Available in org.apache.spark package object
202
val SPARK_VERSION: String
203
val SPARK_VERSION_SHORT: String
204
val SPARK_BRANCH: String
205
val SPARK_REVISION: String
206
val SPARK_BUILD_USER: String
207
val SPARK_REPO_URL: String
208
val SPARK_BUILD_DATE: String
209
```
210
211
### Usage Example
212
213
```scala
214
import org.apache.spark._
215
216
println(s"Running Spark version: $SPARK_VERSION")
217
println(s"Built by: $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
218
println(s"Git revision: $SPARK_REVISION")
219
```
220
221
## Best Practices
222
223
### Configuration Management
224
- Use configuration files or environment variables for deployment-specific settings
225
- Set appropriate memory and core allocations based on cluster resources
226
- Enable adaptive query execution for SQL workloads
227
- Configure serialization (prefer Kryo over Java serialization)
228
229
### Resource Management
230
- Monitor executor resource utilization and adjust allocations
231
- Use dynamic resource allocation in multi-tenant environments
232
- Set appropriate timeouts for network operations
233
- Configure storage levels based on data access patterns
234
235
### Error Handling
236
- Implement proper exception handling in transformations and actions
237
- Use accumulator variables to collect error metrics
238
- Set up comprehensive logging and monitoring
239
- Handle node failures gracefully with appropriate retry policies