0
# Command Building Utilities
1
2
YARN-specific utilities for building container launch commands and managing Spark distribution. These utilities handle the low-level details of container command construction and environment setup.
3
4
## Capabilities
5
6
### YarnSparkHadoopUtil
7
8
Utility object providing YARN-specific Hadoop integration and environment management.
9
10
```scala { .api }
11
object YarnSparkHadoopUtil {
12
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
13
val MEMORY_OVERHEAD_FACTOR: Double
14
val MEMORY_OVERHEAD_MIN: Long
15
val RM_REQUEST_PRIORITY: Priority
16
}
17
```
18
19
**Environment Management:**
20
21
**`addPathToEnvironment(env, key, value): Unit`**
22
- Adds a path value to an environment variable
23
- Handles path separator logic for cross-platform compatibility
24
- Used for setting up executor environment variables
25
26
**Usage Example:**
27
28
```scala
29
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
30
import scala.collection.mutable.HashMap
31
32
val env = new HashMap[String, String]()
33
34
// Add to PATH environment variable
35
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/usr/local/bin")
36
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/opt/spark/bin")
37
38
// Add to LD_LIBRARY_PATH
39
YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", "/usr/local/lib")
40
41
println(env("PATH")) // /usr/local/bin:/opt/spark/bin:$PATH
42
```
43
44
**Constants:**
45
46
**`MEMORY_OVERHEAD_FACTOR: Double`**
47
- Default memory overhead factor for container memory calculation
48
- Typically 0.1 (10% of executor memory)
49
- Used when explicit overhead is not specified
50
51
**`MEMORY_OVERHEAD_MIN: Long`**
52
- Minimum memory overhead in bytes
53
- Typically 384MB (384 * 1024 * 1024 bytes)
54
- Ensures adequate overhead for small executor containers
55
56
**`RM_REQUEST_PRIORITY: Priority`**
57
- Standard priority for ResourceManager container requests
58
- Consistent priority level for all Spark container requests
59
- Used by YarnAllocator for container allocation
60
61
**Memory Calculation Example:**
62
63
```scala
64
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
65
66
def calculateContainerMemory(executorMemoryMB: Long): Long = {
67
val overheadMB = math.max(
68
(executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,
69
YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)
70
)
71
executorMemoryMB + overheadMB
72
}
73
74
// Example calculations
75
println(calculateContainerMemory(1024)) // 1408MB (1024 + 384 min overhead)
76
println(calculateContainerMemory(8192)) // 9011MB (8192 + 819 calculated overhead)
77
```
78
79
### YarnCommandBuilderUtils
80
81
Utilities for building and formatting container launch commands on YARN.
82
83
```scala { .api }
84
object YarnCommandBuilderUtils {
85
def quoteForBatchScript(arg: String): String
86
def findJarsDir(sparkHome: String): String
87
}
88
```
89
90
**Command Building:**
91
92
**`quoteForBatchScript(arg: String): String`**
93
- Properly quotes arguments for batch script execution
94
- Handles special characters and spaces in arguments
95
- Platform-aware quoting for Windows and Unix systems
96
97
**`findJarsDir(sparkHome: String): String`**
98
- Locates the jars directory within a Spark installation
99
- Handles different Spark distribution layouts
100
- Returns path to directory containing Spark JAR files
101
102
**Usage Examples:**
103
104
**Argument Quoting:**
105
106
```scala
107
import org.apache.spark.launcher.YarnCommandBuilderUtils
108
109
// Quote arguments with spaces or special characters
110
val arg1 = YarnCommandBuilderUtils.quoteForBatchScript("my app name")
111
val arg2 = YarnCommandBuilderUtils.quoteForBatchScript("--conf spark.sql.warehouse.dir=/path/with spaces")
112
val arg3 = YarnCommandBuilderUtils.quoteForBatchScript("value_with_$_symbol")
113
114
println(arg1) // "my app name" (on Unix/Linux)
115
println(arg2) // "--conf spark.sql.warehouse.dir=/path/with spaces"
116
println(arg3) // "value_with_\$_symbol" (escaped special chars)
117
```
118
119
**JAR Directory Discovery:**
120
121
```scala
122
import org.apache.spark.launcher.YarnCommandBuilderUtils
123
124
// Find JAR directory in Spark installation
125
val sparkHome = "/opt/spark"
126
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
127
128
println(jarsDir) // /opt/spark/jars (for standard distribution)
129
130
// Use in classpath construction
131
val sparkJars = new File(jarsDir).listFiles()
132
.filter(_.getName.endsWith(".jar"))
133
.map(_.getAbsolutePath)
134
.mkString(":")
135
```
136
137
## Integration with Container Launch
138
139
### Environment Setup
140
141
```scala
142
// Example of environment preparation for executor containers
143
def prepareExecutorEnvironment(
144
sparkHome: String,
145
executorMemory: String,
146
additionalPaths: Seq[String]): HashMap[String, String] = {
147
148
val env = new HashMap[String, String]()
149
150
// Set SPARK_HOME
151
env("SPARK_HOME") = sparkHome
152
153
// Add Spark bins to PATH
154
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/bin")
155
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/sbin")
156
157
// Add additional paths
158
additionalPaths.foreach { path =>
159
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", path)
160
}
161
162
// Set Java library path
163
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
164
YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", s"$jarsDir/../lib")
165
166
env
167
}
168
```
169
170
### Command Construction
171
172
```scala
173
// Example of building executor launch command
174
def buildExecutorCommand(
175
sparkHome: String,
176
executorMemory: String,
177
executorCores: Int,
178
userArgs: Seq[String]): Seq[String] = {
179
180
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
181
val sparkJars = s"$jarsDir/*"
182
183
val baseCommand = Seq(
184
"java",
185
s"-Xmx$executorMemory",
186
"-cp", sparkJars,
187
"org.apache.spark.executor.YarnCoarseGrainedExecutorBackend"
188
)
189
190
// Quote user arguments that might contain spaces
191
val quotedUserArgs = userArgs.map(YarnCommandBuilderUtils.quoteForBatchScript)
192
193
baseCommand ++ quotedUserArgs
194
}
195
```
196
197
## Error Handling
198
199
### Path Resolution Errors
200
201
```scala
202
try {
203
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
204
// Use jarsDir...
205
} catch {
206
case e: IllegalArgumentException =>
207
throw new SparkException(s"Unable to find Spark jars directory in: $sparkHome", e)
208
case e: SecurityException =>
209
throw new SparkException(s"Access denied reading Spark installation: $sparkHome", e)
210
}
211
```
212
213
### Environment Variable Conflicts
214
215
```scala
216
def validateEnvironment(env: HashMap[String, String]): Unit = {
217
// Check for conflicting PATH entries
218
val pathValue = env.getOrElse("PATH", "")
219
if (pathValue.contains("::")) {
220
logWarning("Empty path component detected in PATH environment variable")
221
}
222
223
// Validate SPARK_HOME consistency
224
val sparkHome = env.get("SPARK_HOME")
225
val pathContainsSparkBin = pathValue.contains("/bin") || pathValue.contains("/sbin")
226
227
if (sparkHome.isDefined && !pathContainsSparkBin) {
228
logWarning("SPARK_HOME set but Spark binaries may not be in PATH")
229
}
230
}
231
```
232
233
## Platform Considerations
234
235
### Windows Compatibility
236
237
```scala
238
// Platform-specific behavior in YarnCommandBuilderUtils.quoteForBatchScript
239
def quoteForPlatform(arg: String): String = {
240
val isWindows = System.getProperty("os.name").toLowerCase.contains("windows")
241
242
if (isWindows) {
243
// Windows batch script quoting
244
if (arg.contains(" ") || arg.contains("&") || arg.contains("|")) {
245
s""""$arg""""
246
} else {
247
arg
248
}
249
} else {
250
// Unix shell quoting
251
if (arg.contains(" ") || arg.contains("$") || arg.contains("'")) {
252
s"'${arg.replace("'", "'\\''")}'"
253
} else {
254
arg
255
}
256
}
257
}
258
```
259
260
### Container Resource Constraints
261
262
```scala
263
// Memory overhead calculation considering container limits
264
def calculateOptimalOverhead(
265
executorMemoryMB: Long,
266
nodeMemoryMB: Long,
267
executorsPerNode: Int): Long = {
268
269
val standardOverhead = math.max(
270
(executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,
271
YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)
272
)
273
274
val totalRequestedMemory = (executorMemoryMB + standardOverhead) * executorsPerNode
275
276
if (totalRequestedMemory > nodeMemoryMB * 0.9) {
277
// Reduce overhead if total memory exceeds 90% of node capacity
278
val maxOverhead = (nodeMemoryMB * 0.9 / executorsPerNode - executorMemoryMB).toLong
279
math.max(maxOverhead, YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024))
280
} else {
281
standardOverhead
282
}
283
}
284
```