0
# Configuration and Deployment
1
2
Programmatic interfaces for launching, monitoring, and managing Spark applications across different cluster managers and deployment modes. Includes configuration management, application launchers, and cluster-specific deployment utilities.
3
4
## Capabilities
5
6
### Application Configuration
7
8
Configuration management for Spark applications with support for various deployment scenarios.
9
10
```scala { .api }
11
/**
12
* Configuration for Spark applications
13
*/
14
class SparkConf(loadDefaults: Boolean = true) {
15
/** Set configuration property */
16
def set(key: String, value: String): SparkConf
17
/** Set master URL */
18
def setMaster(master: String): SparkConf
19
/** Set application name */
20
def setAppName(name: String): SparkConf
21
/** Set JAR files to distribute */
22
def setJars(jars: Seq[String]): SparkConf
23
/** Set executor environment variable */
24
def setExecutorEnv(variable: String, value: String): SparkConf
25
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
26
/** Set Spark home directory */
27
def setSparkHome(home: String): SparkConf
28
/** Set all properties from iterable */
29
def setAll(settings: Iterable[(String, String)]): SparkConf
30
/** Set if executor can create SparkContext */
31
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
32
33
/** Get configuration value */
34
def get(key: String): String
35
/** Get configuration value with default */
36
def get(key: String, defaultValue: String): String
37
/** Get all configuration settings */
38
def getAll: Array[(String, String)]
39
/** Get boolean configuration value */
40
def getBoolean(key: String, defaultValue: Boolean): Boolean
41
/** Get integer configuration value */
42
def getInt(key: String, defaultValue: Int): Int
43
/** Get long configuration value */
44
def getLong(key: String, defaultValue: Long): Long
45
/** Get double configuration value */
46
def getDouble(key: String, defaultValue: Double): Double
47
/** Get size configuration value in bytes */
48
def getSizeAsBytes(key: String, defaultValue: String): Long
49
/** Get size configuration value in KB */
50
def getSizeAsKb(key: String, defaultValue: String): Long
51
/** Get size configuration value in MB */
52
def getSizeAsMb(key: String, defaultValue: String): Long
53
/** Get size configuration value in GB */
54
def getSizeAsGb(key: String, defaultValue: String): Long
55
/** Get time configuration value in milliseconds */
56
def getTimeAsMs(key: String, defaultValue: String): Long
57
/** Get time configuration value in seconds */
58
def getTimeAsSeconds(key: String, defaultValue: String): Long
59
60
/** Remove configuration property */
61
def remove(key: String): SparkConf
62
/** Check if configuration contains key */
63
def contains(key: String): Boolean
64
/** Clone configuration */
65
def clone(): SparkConf
66
/** Convert to properties */
67
def toDebugString: String
68
}
69
```
70
71
**Usage Examples:**
72
73
```scala
74
import org.apache.spark.SparkConf
75
76
// Basic configuration
77
val conf = new SparkConf()
78
.setAppName("MySparkApplication")
79
.setMaster("local[4]")
80
.set("spark.executor.memory", "2g")
81
.set("spark.executor.cores", "2")
82
.set("spark.sql.adaptive.enabled", "true")
83
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
84
85
// Environment variables
86
conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-11")
87
conf.setExecutorEnv(Array(
88
("PYTHONPATH", "/path/to/python/libs"),
89
("HADOOP_CONF_DIR", "/etc/hadoop/conf")
90
))
91
92
// File distribution
93
conf.setJars(Seq("hdfs://path/to/app.jar", "hdfs://path/to/lib.jar"))
94
95
// Different master configurations
96
val localConf = new SparkConf().setMaster("local[*]") // All available cores
97
val clusterConf = new SparkConf().setMaster("spark://master:7077") // Standalone
98
val yarnConf = new SparkConf().setMaster("yarn") // YARN cluster
99
val k8sConf = new SparkConf().setMaster("k8s://https://kubernetes.example.com:443") // Kubernetes
100
101
// Configuration validation
102
val memoryValue = conf.getSizeAsMb("spark.executor.memory", "1g")
103
val coresValue = conf.getInt("spark.executor.cores", 1)
104
val adaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)
105
```
106
107
### Application Launcher (Java API)
108
109
Programmatic interface for launching Spark applications from Java applications.
110
111
```java { .api }
112
/**
113
* Programmatic interface for launching Spark applications
114
*/
115
public class SparkLauncher {
116
/** Set application name */
117
public SparkLauncher setAppName(String appName);
118
/** Set master URL */
119
public SparkLauncher setMaster(String master);
120
/** Set main application resource (JAR file) */
121
public SparkLauncher setAppResource(String resource);
122
/** Set main class */
123
public SparkLauncher setMainClass(String mainClass);
124
/** Add application arguments */
125
public SparkLauncher addAppArgs(String... args);
126
/** Set Spark configuration */
127
public SparkLauncher setConf(String key, String value);
128
/** Set Spark home directory */
129
public SparkLauncher setSparkHome(String sparkHome);
130
/** Set properties file */
131
public SparkLauncher setPropertiesFile(String path);
132
/** Set deploy mode (client/cluster) */
133
public SparkLauncher setDeployMode(String mode);
134
/** Set verbose logging */
135
public SparkLauncher setVerbose(boolean verbose);
136
137
/** Launch and return Process */
138
public Process launch() throws IOException;
139
/** Launch and return application handle */
140
public SparkAppHandle startApplication() throws IOException;
141
/** Launch and return application handle with listener */
142
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
143
}
144
145
/**
146
* Handle to monitor and control Spark applications
147
*/
148
public interface SparkAppHandle {
149
/** Get current application state */
150
State getState();
151
/** Get application ID */
152
String getAppId();
153
/** Kill the application */
154
void kill();
155
/** Disconnect from application */
156
void disconnect();
157
/** Add state change listener */
158
void addListener(Listener listener);
159
160
/** Application states */
161
enum State {
162
UNKNOWN, CONNECTED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED, LOST
163
}
164
165
/** Listener for application state changes */
166
interface Listener {
167
void stateChanged(SparkAppHandle handle);
168
void infoChanged(SparkAppHandle handle);
169
}
170
}
171
```
172
173
**Usage Examples:**
174
175
```java
176
import org.apache.spark.launcher.*;
177
178
// Basic application launch
179
SparkLauncher launcher = new SparkLauncher()
180
.setAppName("MyApp")
181
.setMaster("local[2]")
182
.setAppResource("path/to/my-app.jar")
183
.setMainClass("com.example.MyMainClass")
184
.addAppArgs("arg1", "arg2", "arg3");
185
186
// Launch and wait
187
Process process = launcher.launch();
188
int exitCode = process.waitFor();
189
190
// Launch with monitoring
191
SparkAppHandle handle = launcher.startApplication(new SparkAppHandle.Listener() {
192
@Override
193
public void stateChanged(SparkAppHandle handle) {
194
System.out.println("State changed to: " + handle.getState());
195
if (handle.getState().isFinal()) {
196
System.out.println("Application finished with state: " + handle.getState());
197
}
198
}
199
200
@Override
201
public void infoChanged(SparkAppHandle handle) {
202
System.out.println("App ID: " + handle.getAppId());
203
}
204
});
205
206
// Monitor application
207
while (!handle.getState().isFinal()) {
208
Thread.sleep(1000);
209
}
210
211
// Cluster mode launch
212
SparkLauncher clusterLauncher = new SparkLauncher()
213
.setAppName("ClusterApp")
214
.setMaster("yarn")
215
.setDeployMode("cluster")
216
.setAppResource("hdfs://path/to/app.jar")
217
.setMainClass("com.example.ClusterApp")
218
.setConf("spark.executor.instances", "10")
219
.setConf("spark.executor.memory", "4g")
220
.setConf("spark.executor.cores", "2");
221
222
SparkAppHandle clusterHandle = clusterLauncher.startApplication();
223
```
224
225
### Runtime Configuration
226
227
Runtime configuration management for active Spark sessions.
228
229
```scala { .api }
230
/**
231
* Runtime configuration interface for SparkSession
232
*/
233
class RuntimeConfig {
234
/** Set configuration value */
235
def set(key: String, value: String): Unit
236
def set(key: String, value: Boolean): Unit
237
def set(key: String, value: Long): Unit
238
/** Get configuration value */
239
def get(key: String): String
240
def get(key: String, default: String): String
241
/** Get all configuration values */
242
def getAll: Map[String, String]
243
/** Unset configuration value */
244
def unset(key: String): Unit
245
/** Check if configuration is modifiable */
246
def isModifiable(key: String): Boolean
247
}
248
```
249
250
### Cluster Manager Integration
251
252
Support for different cluster managers and deployment modes.
253
254
```scala { .api }
255
/**
256
* Standalone cluster configuration
257
*/
258
object StandaloneClusterManager {
259
/** Default master port */
260
val DEFAULT_MASTER_PORT = 7077
261
/** Default worker port */
262
val DEFAULT_WORKER_PORT = 7078
263
/** Default web UI port */
264
val DEFAULT_MASTER_WEBUI_PORT = 8080
265
}
266
267
/**
268
* YARN cluster configuration
269
*/
270
object YarnClusterManager {
271
/** YARN client mode */
272
val CLIENT_MODE = "client"
273
/** YARN cluster mode */
274
val CLUSTER_MODE = "cluster"
275
}
276
277
/**
278
* Kubernetes cluster configuration
279
*/
280
object KubernetesClusterManager {
281
/** Default namespace */
282
val DEFAULT_NAMESPACE = "default"
283
/** Default service account */
284
val DEFAULT_SERVICE_ACCOUNT = "default"
285
}
286
```
287
288
### Application Monitoring
289
290
Interfaces for monitoring application status and metrics.
291
292
```scala { .api }
293
/**
294
* Status tracker for monitoring Spark applications
295
*/
296
class SparkStatusTracker(sc: SparkContext) {
297
/** Get executor infos */
298
def getExecutorInfos: Array[SparkExecutorInfo]
299
/** Get active stage ids */
300
def getActiveStageIds: Array[Int]
301
/** Get active job ids */
302
def getActiveJobIds: Array[Int]
303
/** Get stage info */
304
def getStageInfo(stageId: Int): Option[SparkStageInfo]
305
/** Get job info */
306
def getJobInfo(jobId: Int): Option[SparkJobInfo]
307
/** Get job ids for job group */
308
def getJobIdsForGroup(jobGroup: String): Array[Int]
309
}
310
311
/**
312
* Executor information
313
*/
314
case class SparkExecutorInfo(
315
executorId: String,
316
executorHost: String,
317
totalCores: Int,
318
maxTasks: Int,
319
activeTasks: Int,
320
failedTasks: Int,
321
completedTasks: Int,
322
totalTasks: Int,
323
totalDuration: Long,
324
totalGCTime: Long,
325
totalInputBytes: Long,
326
totalShuffleRead: Long,
327
totalShuffleWrite: Long,
328
isActive: Boolean,
329
maxMemory: Long,
330
addTime: java.util.Date,
331
removeTime: Option[java.util.Date],
332
removeReason: Option[String]
333
)
334
```
335
336
### Environment and System Information
337
338
Utilities for accessing environment and system information.
339
340
```scala { .api }
341
/**
342
* Utilities for accessing files distributed with application
343
*/
344
object SparkFiles {
345
/** Get absolute path of added file */
346
def get(filename: String): String
347
/** Get root directory of added files */
348
def getRootDirectory(): String
349
}
350
351
/**
352
* Environment information and utilities
353
*/
354
class SparkEnv {
355
/** Get serializer */
356
def serializer: Serializer
357
/** Get closure serializer */
358
def closureSerializer: Serializer
359
/** Get cache manager */
360
def cacheManager: CacheManager
361
/** Get map output tracker */
362
def mapOutputTracker: MapOutputTracker
363
/** Get shuffle manager */
364
def shuffleManager: ShuffleManager
365
/** Get broadcast manager */
366
def broadcastManager: BroadcastManager
367
/** Get block manager */
368
def blockManager: BlockManager
369
/** Get security manager */
370
def securityManager: SecurityManager
371
/** Get metrics system */
372
def metricsSystem: MetricsSystem
373
/** Get executor ID */
374
def executorId: String
375
}
376
```
377
378
### Deployment Modes
379
380
Configuration for different deployment scenarios.
381
382
```scala { .api }
383
/**
384
* Deploy modes
385
*/
386
object DeployMode extends Enumeration {
387
type DeployMode = Value
388
val CLIENT, CLUSTER = Value
389
}
390
391
/**
392
* Master URLs for different cluster types
393
*/
394
object MasterURLs {
395
/** Local mode with n threads */
396
def local(n: Int = 1): String = s"local[$n]"
397
/** Local mode with all available cores */
398
def localAll: String = "local[*]"
399
/** Standalone cluster */
400
def standalone(host: String, port: Int = 7077): String = s"spark://$host:$port"
401
/** YARN cluster */
402
def yarn: String = "yarn"
403
/** Mesos cluster */
404
def mesos(host: String, port: Int = 5050): String = s"mesos://$host:$port"
405
/** Kubernetes cluster */
406
def kubernetes(apiServer: String): String = s"k8s://$apiServer"
407
}
408
```
409
410
**Usage Examples:**
411
412
```scala
413
// Different deployment configurations
414
val localConf = new SparkConf()
415
.setMaster(MasterURLs.localAll)
416
.setAppName("LocalApp")
417
418
val standaloneConf = new SparkConf()
419
.setMaster(MasterURLs.standalone("spark-master.example.com"))
420
.setAppName("StandaloneApp")
421
.set("spark.executor.instances", "4")
422
.set("spark.executor.memory", "2g")
423
.set("spark.executor.cores", "2")
424
425
val yarnConf = new SparkConf()
426
.setMaster(MasterURLs.yarn)
427
.setAppName("YarnApp")
428
.set("spark.submit.deployMode", "cluster")
429
.set("spark.executor.instances", "10")
430
.set("spark.executor.memory", "4g")
431
.set("spark.executor.cores", "3")
432
.set("spark.yarn.queue", "production")
433
434
val k8sConf = new SparkConf()
435
.setMaster(MasterURLs.kubernetes("https://k8s.example.com:443"))
436
.setAppName("K8sApp")
437
.set("spark.kubernetes.container.image", "spark:3.5.6")
438
.set("spark.executor.instances", "5")
439
.set("spark.kubernetes.namespace", "spark-jobs")
440
.set("spark.kubernetes.executor.request.cores", "1")
441
.set("spark.kubernetes.executor.limit.cores", "2")
442
443
// Application monitoring
444
val sc = new SparkContext(conf)
445
val statusTracker = sc.statusTracker
446
447
// Monitor executors
448
val executors = statusTracker.getExecutorInfos
449
executors.foreach { executor =>
450
println(s"Executor ${executor.executorId}: ${executor.activeTasks} active tasks")
451
}
452
453
// Monitor jobs and stages
454
val activeJobs = statusTracker.getActiveJobIds
455
val activeStages = statusTracker.getActiveStageIds
456
457
activeJobs.foreach { jobId =>
458
statusTracker.getJobInfo(jobId).foreach { jobInfo =>
459
println(s"Job $jobId: ${jobInfo.status}")
460
}
461
}
462
```
463
464
### Configuration Properties
465
466
Key configuration properties for different aspects of Spark applications.
467
468
```scala { .api }
469
/**
470
* Common configuration properties
471
*/
472
object SparkConfigs {
473
// Application properties
474
val APP_NAME = "spark.app.name"
475
val MASTER = "spark.master"
476
val DEPLOY_MODE = "spark.submit.deployMode"
477
val DRIVER_MEMORY = "spark.driver.memory"
478
val DRIVER_CORES = "spark.driver.cores"
479
val EXECUTOR_MEMORY = "spark.executor.memory"
480
val EXECUTOR_CORES = "spark.executor.cores"
481
val EXECUTOR_INSTANCES = "spark.executor.instances"
482
483
// Dynamic allocation
484
val DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
485
val DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
486
val DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
487
val DYNAMIC_ALLOCATION_INITIAL_EXECUTORS = "spark.dynamicAllocation.initialExecutors"
488
489
// Serialization
490
val SERIALIZER = "spark.serializer"
491
val KRYO_REGISTRATOR = "spark.kryo.registrator"
492
val KRYO_UNSAFE = "spark.kryo.unsafe"
493
494
// SQL properties
495
val SQL_ADAPTIVE_ENABLED = "spark.sql.adaptive.enabled"
496
val SQL_ADAPTIVE_COALESCE_PARTITIONS = "spark.sql.adaptive.coalescePartitions.enabled"
497
val SQL_ADAPTIVE_SKEW_JOIN = "spark.sql.adaptive.skewJoin.enabled"
498
val SQL_WAREHOUSE_DIR = "spark.sql.warehouse.dir"
499
500
// Shuffle properties
501
val SHUFFLE_COMPRESS = "spark.shuffle.compress"
502
val SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
503
val SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
504
505
// Storage properties
506
val STORAGE_LEVEL = "spark.storage.level"
507
val STORAGE_MEMORY_FRACTION = "spark.storage.memoryFraction"
508
val STORAGE_SAFETY_FRACTION = "spark.storage.safetyFraction"
509
}
510
```
511
512
**Configuration Examples:**
513
514
```scala
515
// Performance tuning configuration
516
val perfConf = new SparkConf()
517
.setAppName("HighPerformanceApp")
518
.set(SparkConfigs.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
519
.set(SparkConfigs.SQL_ADAPTIVE_ENABLED, "true")
520
.set(SparkConfigs.SQL_ADAPTIVE_COALESCE_PARTITIONS, "true")
521
.set(SparkConfigs.SQL_ADAPTIVE_SKEW_JOIN, "true")
522
.set(SparkConfigs.DYNAMIC_ALLOCATION_ENABLED, "true")
523
.set(SparkConfigs.DYNAMIC_ALLOCATION_MIN_EXECUTORS, "2")
524
.set(SparkConfigs.DYNAMIC_ALLOCATION_MAX_EXECUTORS, "20")
525
.set(SparkConfigs.DYNAMIC_ALLOCATION_INITIAL_EXECUTORS, "5")
526
527
// Memory-intensive configuration
528
val memoryConf = new SparkConf()
529
.setAppName("MemoryIntensiveApp")
530
.set(SparkConfigs.EXECUTOR_MEMORY, "8g")
531
.set(SparkConfigs.DRIVER_MEMORY, "4g")
532
.set(SparkConfigs.STORAGE_MEMORY_FRACTION, "0.8")
533
.set("spark.executor.memoryOffHeap.enabled", "true")
534
.set("spark.executor.memoryOffHeap.size", "2g")
535
536
// Cluster-specific configurations
537
val yarnProdConf = new SparkConf()
538
.setAppName("ProductionYarnApp")
539
.setMaster("yarn")
540
.set("spark.submit.deployMode", "cluster")
541
.set("spark.yarn.queue", "production")
542
.set("spark.yarn.archive", "hdfs://path/to/spark-archive.zip")
543
.set("spark.eventLog.enabled", "true")
544
.set("spark.eventLog.dir", "hdfs://path/to/spark-events")
545
.set("spark.history.fs.logDirectory", "hdfs://path/to/spark-events")
546
```
547
548
## Error Handling
549
550
Common deployment and configuration exceptions:
551
552
- `IllegalArgumentException` - Invalid configuration values or parameters
553
- `SparkException` - General Spark application launch/runtime errors
554
- `IOException` - File system or network errors during application launch
555
- `SecurityException` - Security-related configuration or access errors
556
- `ClassNotFoundException` - Missing application or dependency classes