0
# Configuration Management
1
2
Comprehensive configuration system with predefined constants for common Spark settings and fluent configuration methods.
3
4
## Capabilities
5
6
### Configuration Constants
7
8
Predefined constants for common Spark configuration keys, providing type safety and preventing configuration errors.
9
10
```java { .api }
11
/**
12
* Master and deployment configuration
13
*/
14
public static final String SPARK_MASTER = "spark.master";
15
public static final String DEPLOY_MODE = "spark.submit.deployMode";
16
17
/**
18
* Driver configuration keys
19
*/
20
public static final String DRIVER_MEMORY = "spark.driver.memory";
21
public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
22
public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
23
public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
24
25
/**
26
* Executor configuration keys
27
*/
28
public static final String EXECUTOR_MEMORY = "spark.executor.memory";
29
public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
30
public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
31
public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
32
public static final String EXECUTOR_CORES = "spark.executor.cores";
33
34
/**
35
* Special configuration values and launcher settings
36
*/
37
public static final String NO_RESOURCE = "spark-internal";
38
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
39
public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
40
```
41
42
**Usage Examples:**
43
44
```java
45
import org.apache.spark.launcher.SparkLauncher;
46
47
// Driver configuration using constants
48
SparkLauncher launcher = new SparkLauncher()
49
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
50
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-XX:+UseG1GC -XX:MaxGCPauseMillis=200")
51
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/libs/mysql-connector.jar:/libs/custom.jar");
52
53
// Executor configuration using constants
54
launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
55
.setConf(SparkLauncher.EXECUTOR_CORES, "4")
56
.setConf(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "-XX:+UseG1GC")
57
.setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, "/libs/shared-utils.jar");
58
59
// Master and deployment mode
60
launcher.setConf(SparkLauncher.SPARK_MASTER, "yarn")
61
.setConf(SparkLauncher.DEPLOY_MODE, "cluster");
62
63
// Special resource handling
64
launcher.setAppResource(SparkLauncher.NO_RESOURCE) // Skip resource processing
65
.setMainClass("com.company.AppWithJarless");
66
67
// Launcher-specific configuration
68
launcher.setConf(SparkLauncher.CHILD_PROCESS_LOGGER_NAME, "com.company.spark.launcher")
69
.setConf(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "60000"); // 60 seconds
70
```
71
72
### Configuration Methods
73
74
Fluent configuration API providing type-safe methods for setting application parameters and Spark configuration.
75
76
```java { .api }
77
/**
78
* Configuration methods available on all launcher types
79
*/
80
81
/** Set custom properties file with Spark configuration */
82
public T setPropertiesFile(String path);
83
84
/** Set single configuration value (key must start with "spark.") */
85
public T setConf(String key, String value);
86
87
/** Set application name displayed in Spark UI */
88
public T setAppName(String appName);
89
90
/** Set Spark master URL (local, yarn, mesos, k8s, spark://) */
91
public T setMaster(String master);
92
93
/** Set deploy mode (client or cluster) */
94
public T setDeployMode(String mode);
95
96
/** Set main application resource (jar for Java/Scala, python script for PySpark) */
97
public T setAppResource(String resource);
98
99
/** Set main class name for Java/Scala applications */
100
public T setMainClass(String mainClass);
101
102
/** Add jar file to be submitted with application */
103
public T addJar(String jar);
104
105
/** Add file to be submitted with application */
106
public T addFile(String file);
107
108
/** Add Python file/zip/egg to be submitted with application */
109
public T addPyFile(String file);
110
111
/** Add command line arguments for the application */
112
public T addAppArgs(String... args);
113
114
/** Add no-value argument to Spark invocation */
115
public T addSparkArg(String arg);
116
117
/** Add argument with value to Spark invocation */
118
public T addSparkArg(String name, String value);
119
120
/** Enable verbose reporting for SparkSubmit */
121
public T setVerbose(boolean verbose);
122
```
123
124
**Usage Examples:**
125
126
```java
127
import org.apache.spark.launcher.SparkLauncher;
128
129
// Comprehensive application configuration
130
SparkLauncher launcher = new SparkLauncher()
131
// Basic application setup
132
.setAppName("Production ETL Pipeline")
133
.setMaster("yarn")
134
.setDeployMode("cluster")
135
.setAppResource("/apps/etl-pipeline-1.0.jar")
136
.setMainClass("com.company.etl.ETLMain")
137
138
// Spark configuration using constants
139
.setConf(SparkLauncher.DRIVER_MEMORY, "8g")
140
.setConf(SparkLauncher.EXECUTOR_MEMORY, "4g")
141
.setConf(SparkLauncher.EXECUTOR_CORES, "4")
142
143
// Additional Spark configuration
144
.setConf("spark.sql.adaptive.enabled", "true")
145
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
146
.setConf("spark.sql.adaptive.skewJoin.enabled", "true")
147
.setConf("spark.sql.shuffle.partitions", "400")
148
.setConf("spark.dynamicAllocation.enabled", "true")
149
.setConf("spark.dynamicAllocation.minExecutors", "2")
150
.setConf("spark.dynamicAllocation.maxExecutors", "20")
151
152
// Dependencies and resources
153
.addJar("/libs/mysql-connector-java-8.0.25.jar")
154
.addJar("/libs/spark-avro_2.11-2.4.8.jar")
155
.addFile("/config/application.conf")
156
.addFile("/config/log4j.properties")
157
158
// Application arguments
159
.addAppArgs("--input-path", "/data/raw/2023/12/01")
160
.addAppArgs("--output-path", "/data/processed/2023/12/01")
161
.addAppArgs("--config-file", "application.conf")
162
.addAppArgs("--partition-count", "100")
163
164
// Enable verbose output for debugging
165
.setVerbose(true);
166
167
// Properties file configuration
168
launcher.setPropertiesFile("/etc/spark/spark-defaults.conf");
169
170
// Advanced Spark arguments
171
launcher.addSparkArg("--archives", "python-env.zip#env")
172
.addSparkArg("--py-files", "utils.py,transforms.py")
173
.addSparkArg("--queue", "production")
174
.addSparkArg("--principal", "spark@COMPANY.COM")
175
.addSparkArg("--keytab", "/etc/security/spark.keytab");
176
```
177
178
### Properties File Configuration
179
180
External configuration file support for managing complex Spark configurations.
181
182
**Usage Examples:**
183
184
```java
185
// Using external properties file
186
SparkLauncher launcher = new SparkLauncher()
187
.setPropertiesFile("/config/spark-production.conf")
188
.setAppResource("/apps/myapp.jar")
189
.setMainClass("com.company.MyApp");
190
191
// Properties file can override individual settings
192
launcher.setConf("spark.app.name", "Override App Name") // Overrides file setting
193
.setConf("spark.driver.memory", "16g"); // Overrides file setting
194
```
195
196
**Example properties file (`spark-production.conf`):**
197
```properties
198
spark.master=yarn
199
spark.submit.deployMode=cluster
200
spark.driver.memory=8g
201
spark.executor.memory=4g
202
spark.executor.cores=4
203
spark.executor.instances=10
204
spark.sql.adaptive.enabled=true
205
spark.sql.adaptive.coalescePartitions.enabled=true
206
spark.serializer=org.apache.spark.serializer.KryoSerializer
207
spark.sql.hive.metastore.uris=thrift://metastore:9083
208
spark.hadoop.fs.defaultFS=hdfs://namenode:8020
209
```
210
211
### Configuration Validation
212
213
Built-in validation for configuration parameters to prevent common errors.
214
215
```java { .api }
216
/**
217
* Configuration validation rules:
218
* - setConf() keys must start with "spark."
219
* - Parameters cannot be null
220
* - Known Spark arguments are validated for correct value expectations
221
*/
222
```
223
224
**Usage Examples:**
225
226
```java
227
try {
228
// Valid configuration calls
229
launcher.setConf("spark.driver.memory", "4g"); // Valid: starts with "spark."
230
launcher.setConf("spark.custom.property", "value"); // Valid: custom spark property
231
launcher.setMaster("yarn"); // Valid: known master type
232
launcher.setDeployMode("cluster"); // Valid: known deploy mode
233
234
// Invalid configuration calls (will throw IllegalArgumentException)
235
launcher.setConf("invalid.key", "value"); // Invalid: doesn't start with "spark."
236
launcher.setConf("spark.driver.memory", null); // Invalid: null value
237
launcher.setMaster(null); // Invalid: null master
238
launcher.setAppName(null); // Invalid: null app name
239
240
} catch (IllegalArgumentException e) {
241
System.err.println("Configuration error: " + e.getMessage());
242
}
243
244
// Spark argument validation
245
try {
246
launcher.addSparkArg("--master", "yarn"); // Valid: known argument with value
247
launcher.addSparkArg("--version"); // Valid: known no-value argument
248
launcher.addSparkArg("--custom-arg", "value"); // Valid: unknown args allowed for compatibility
249
250
launcher.addSparkArg("--master"); // Invalid: --master expects a value
251
} catch (IllegalArgumentException e) {
252
System.err.println("Spark argument error: " + e.getMessage());
253
}
254
```
255
256
## Configuration Patterns
257
258
### Development Configuration
259
260
```java
261
public class DevelopmentConfig {
262
public static SparkLauncher createDevelopmentLauncher() {
263
return new SparkLauncher()
264
.setMaster("local[*]") // Use all local cores
265
.setDeployMode("client") // Client mode for development
266
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
267
.setConf("spark.sql.adaptive.enabled", "false") // Disable for predictable behavior
268
.setConf("spark.sql.shuffle.partitions", "4") // Small partition count
269
.setVerbose(true); // Enable verbose logging
270
}
271
}
272
273
// Usage
274
SparkLauncher devLauncher = DevelopmentConfig.createDevelopmentLauncher()
275
.setAppResource("/target/myapp-dev.jar")
276
.setMainClass("com.company.DevApp")
277
.setAppName("Development Test");
278
```
279
280
### Production Configuration
281
282
```java
283
public class ProductionConfig {
284
public static SparkLauncher createProductionLauncher() {
285
return new SparkLauncher()
286
.setMaster("yarn")
287
.setDeployMode("cluster")
288
.setPropertiesFile("/etc/spark/production.conf")
289
290
// Production-specific overrides
291
.setConf(SparkLauncher.DRIVER_MEMORY, "8g")
292
.setConf(SparkLauncher.EXECUTOR_MEMORY, "6g")
293
.setConf(SparkLauncher.EXECUTOR_CORES, "5")
294
295
// Performance tuning
296
.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
297
.setConf("spark.sql.adaptive.enabled", "true")
298
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
299
.setConf("spark.sql.adaptive.skewJoin.enabled", "true")
300
301
// Resource management
302
.setConf("spark.dynamicAllocation.enabled", "true")
303
.setConf("spark.dynamicAllocation.minExecutors", "5")
304
.setConf("spark.dynamicAllocation.maxExecutors", "50")
305
.setConf("spark.dynamicAllocation.initialExecutors", "10")
306
307
// Logging and monitoring
308
.setConf("spark.eventLog.enabled", "true")
309
.setConf("spark.eventLog.dir", "hdfs://logs/spark-events")
310
.setConf("spark.history.fs.logDirectory", "hdfs://logs/spark-events");
311
}
312
}
313
```
314
315
### Configuration Templates
316
317
```java
318
public class ConfigurationTemplates {
319
320
// Template for memory-intensive applications
321
public static void configureMemoryIntensive(SparkLauncher launcher) {
322
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "16g")
323
.setConf(SparkLauncher.EXECUTOR_MEMORY, "8g")
324
.setConf("spark.executor.memoryFraction", "0.8")
325
.setConf("spark.storage.memoryFraction", "0.6")
326
.setConf("spark.sql.shuffle.partitions", "800");
327
}
328
329
// Template for CPU-intensive applications
330
public static void configureCpuIntensive(SparkLauncher launcher) {
331
launcher.setConf(SparkLauncher.EXECUTOR_CORES, "6")
332
.setConf(SparkLauncher.EXECUTOR_MEMORY, "4g")
333
.setConf("spark.task.cpus", "1")
334
.setConf("spark.sql.shuffle.partitions", "1200");
335
}
336
337
// Template for streaming applications
338
public static void configureStreaming(SparkLauncher launcher) {
339
launcher.setConf("spark.streaming.backpressure.enabled", "true")
340
.setConf("spark.streaming.dynamicAllocation.enabled", "true")
341
.setConf("spark.streaming.receiver.maxRate", "10000")
342
.setConf("spark.streaming.kafka.maxRatePerPartition", "1000");
343
}
344
345
// Template for machine learning workloads
346
public static void configureMachineLearning(SparkLauncher launcher) {
347
launcher.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
348
.setConf("spark.kryo.unsafe", "true")
349
.setConf("spark.sql.execution.arrow.pyspark.enabled", "true")
350
.setConf("spark.sql.adaptive.enabled", "true")
351
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true");
352
}
353
}
354
355
// Usage
356
SparkLauncher launcher = new SparkLauncher()
357
.setAppResource("/apps/ml-pipeline.jar")
358
.setMainClass("com.company.ml.Pipeline")
359
.setMaster("yarn")
360
.setDeployMode("cluster");
361
362
ConfigurationTemplates.configureMachineLearning(launcher);
363
ConfigurationTemplates.configureMemoryIntensive(launcher);
364
```
365
366
### Environment-Specific Configuration
367
368
```java
369
public class EnvironmentConfiguration {
370
371
public enum Environment {
372
DEVELOPMENT, TESTING, STAGING, PRODUCTION
373
}
374
375
public static SparkLauncher configureForEnvironment(SparkLauncher launcher, Environment env) {
376
switch (env) {
377
case DEVELOPMENT:
378
return launcher.setMaster("local[*]")
379
.setDeployMode("client")
380
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
381
.setVerbose(true);
382
383
case TESTING:
384
return launcher.setMaster("local[4]")
385
.setDeployMode("client")
386
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
387
.setConf("spark.sql.shuffle.partitions", "8");
388
389
case STAGING:
390
return launcher.setMaster("yarn")
391
.setDeployMode("cluster")
392
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
393
.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
394
.setPropertiesFile("/etc/spark/staging.conf");
395
396
case PRODUCTION:
397
return launcher.setMaster("yarn")
398
.setDeployMode("cluster")
399
.setPropertiesFile("/etc/spark/production.conf")
400
.setConf(SparkLauncher.CHILD_PROCESS_LOGGER_NAME, "production.spark.launcher");
401
402
default:
403
throw new IllegalArgumentException("Unknown environment: " + env);
404
}
405
}
406
}
407
408
// Usage
409
SparkLauncher launcher = new SparkLauncher()
410
.setAppResource("/apps/etl.jar")
411
.setMainClass("com.company.ETL");
412
413
Environment currentEnv = Environment.valueOf(System.getProperty("env", "DEVELOPMENT"));
414
launcher = EnvironmentConfiguration.configureForEnvironment(launcher, currentEnv);
415
```
416
417
## Advanced Configuration Techniques
418
419
### Dynamic Configuration
420
421
```java
422
public class DynamicConfiguration {
423
424
public static void configureBasedOnData(SparkLauncher launcher, DataCharacteristics data) {
425
// Adjust partitions based on data size
426
int partitions = calculateOptimalPartitions(data.getSizeGB());
427
launcher.setConf("spark.sql.shuffle.partitions", String.valueOf(partitions));
428
429
// Adjust memory based on data complexity
430
String executorMemory = data.isComplexProcessing() ? "8g" : "4g";
431
launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, executorMemory);
432
433
// Enable adaptive features for large datasets
434
if (data.getSizeGB() > 100) {
435
launcher.setConf("spark.sql.adaptive.enabled", "true")
436
.setConf("spark.sql.adaptive.skewJoin.enabled", "true");
437
}
438
}
439
440
private static int calculateOptimalPartitions(double sizeGB) {
441
// Rule of thumb: 128MB per partition
442
return Math.max(4, (int) Math.ceil(sizeGB * 1024 / 128));
443
}
444
}
445
```
446
447
### Configuration Inheritance
448
449
```java
450
public class ConfigurationBuilder {
451
private SparkLauncher launcher;
452
453
public ConfigurationBuilder(SparkLauncher launcher) {
454
this.launcher = launcher;
455
}
456
457
public ConfigurationBuilder withBaseConfiguration() {
458
launcher.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
459
.setConf("spark.sql.adaptive.enabled", "true");
460
return this;
461
}
462
463
public ConfigurationBuilder withHighMemory() {
464
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "16g")
465
.setConf(SparkLauncher.EXECUTOR_MEMORY, "8g");
466
return this;
467
}
468
469
public ConfigurationBuilder withHighCpu() {
470
launcher.setConf(SparkLauncher.EXECUTOR_CORES, "8");
471
return this;
472
}
473
474
public ConfigurationBuilder withCustom(String key, String value) {
475
launcher.setConf(key, value);
476
return this;
477
}
478
479
public SparkLauncher build() {
480
return launcher;
481
}
482
}
483
484
// Usage
485
SparkLauncher launcher = new ConfigurationBuilder(new SparkLauncher())
486
.withBaseConfiguration()
487
.withHighMemory()
488
.withCustom("spark.sql.shuffle.partitions", "1000")
489
.build();
490
```
491
492
## Configuration Best Practices
493
494
### Memory Configuration Guidelines
495
496
```java
497
// Good: Balanced memory allocation
498
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
499
.setConf(SparkLauncher.EXECUTOR_MEMORY, "6g")
500
.setConf("spark.executor.memoryFraction", "0.8");
501
502
// Avoid: Overallocating driver memory for non-collect operations
503
// launcher.setConf(SparkLauncher.DRIVER_MEMORY, "32g"); // Usually unnecessary
504
505
// Good: Consider storage vs execution memory balance
506
launcher.setConf("spark.storage.memoryFraction", "0.5")
507
.setConf("spark.executor.memoryFraction", "0.8");
508
```
509
510
### Resource Planning
511
512
```java
513
// Calculate executor count based on cluster resources
514
public static void configureExecutors(SparkLauncher launcher, ClusterResources cluster) {
515
int coresPerNode = cluster.getCoresPerNode();
516
int memoryPerNodeGB = cluster.getMemoryPerNodeGB();
517
int nodeCount = cluster.getNodeCount();
518
519
// Leave 1 core for OS, aim for 2-5 cores per executor
520
int coresPerExecutor = Math.min(5, Math.max(2, coresPerNode / 2));
521
int executorsPerNode = (coresPerNode - 1) / coresPerExecutor;
522
int totalExecutors = executorsPerNode * nodeCount;
523
524
// Leave memory for OS and other processes
525
int executorMemoryGB = (memoryPerNodeGB - 2) / executorsPerNode;
526
527
launcher.setConf("spark.executor.instances", String.valueOf(totalExecutors))
528
.setConf(SparkLauncher.EXECUTOR_CORES, String.valueOf(coresPerExecutor))
529
.setConf(SparkLauncher.EXECUTOR_MEMORY, executorMemoryGB + "g");
530
}
531
```
532
533
### Error Prevention
534
535
```java
536
// Good: Use constants to prevent typos
537
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "4g");
538
539
// Avoid: String literals prone to typos
540
// launcher.setConf("spark.driver.memory", "4g");
541
542
// Good: Validate configuration values
543
public static void setMemoryWithValidation(SparkLauncher launcher, String memory) {
544
if (!memory.matches("\\d+[gG]")) {
545
throw new IllegalArgumentException("Memory must be in format '4g' or '4G'");
546
}
547
launcher.setConf(SparkLauncher.DRIVER_MEMORY, memory);
548
}
549
550
// Good: Use type-safe configuration methods when available
551
launcher.setMaster("yarn") // Type-safe method
552
.setDeployMode("cluster") // Type-safe method
553
.setVerbose(true); // Type-safe method
554
```