or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md

command-building.mddocs/

0

# Command Building Utilities

1

2

YARN-specific utilities for building container launch commands and managing Spark distribution. These utilities handle the low-level details of container command construction and environment setup.

3

4

## Capabilities

5

6

### YarnSparkHadoopUtil

7

8

Utility object providing YARN-specific Hadoop integration and environment management.

9

10

```scala { .api }

11

object YarnSparkHadoopUtil {

12

def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit

13

val MEMORY_OVERHEAD_FACTOR: Double

14

val MEMORY_OVERHEAD_MIN: Long

15

val RM_REQUEST_PRIORITY: Priority

16

}

17

```

18

19

**Environment Management:**

20

21

**`addPathToEnvironment(env, key, value): Unit`**

22

- Adds a path value to an environment variable

23

- Handles path separator logic for cross-platform compatibility

24

- Used for setting up executor environment variables

25

26

**Usage Example:**

27

28

```scala

29

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

30

import scala.collection.mutable.HashMap

31

32

val env = new HashMap[String, String]()

33

34

// Add to PATH environment variable

35

YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/usr/local/bin")

36

YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/opt/spark/bin")

37

38

// Add to LD_LIBRARY_PATH

39

YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", "/usr/local/lib")

40

41

println(env("PATH")) // /usr/local/bin:/opt/spark/bin:$PATH

42

```

43

44

**Constants:**

45

46

**`MEMORY_OVERHEAD_FACTOR: Double`**

47

- Default memory overhead factor for container memory calculation

48

- Typically 0.1 (10% of executor memory)

49

- Used when explicit overhead is not specified

50

51

**`MEMORY_OVERHEAD_MIN: Long`**

52

- Minimum memory overhead in bytes

53

- Typically 384MB (384 * 1024 * 1024 bytes)

54

- Ensures adequate overhead for small executor containers

55

56

**`RM_REQUEST_PRIORITY: Priority`**

57

- Standard priority for ResourceManager container requests

58

- Consistent priority level for all Spark container requests

59

- Used by YarnAllocator for container allocation

60

61

**Memory Calculation Example:**

62

63

```scala

64

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

65

66

def calculateContainerMemory(executorMemoryMB: Long): Long = {

67

val overheadMB = math.max(

68

(executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,

69

YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)

70

)

71

executorMemoryMB + overheadMB

72

}

73

74

// Example calculations

75

println(calculateContainerMemory(1024)) // 1408MB (1024 + 384 min overhead)

76

println(calculateContainerMemory(8192)) // 9011MB (8192 + 819 calculated overhead)

77

```

78

79

### YarnCommandBuilderUtils

80

81

Utilities for building and formatting container launch commands on YARN.

82

83

```scala { .api }

84

object YarnCommandBuilderUtils {

85

def quoteForBatchScript(arg: String): String

86

def findJarsDir(sparkHome: String): String

87

}

88

```

89

90

**Command Building:**

91

92

**`quoteForBatchScript(arg: String): String`**

93

- Properly quotes arguments for batch script execution

94

- Handles special characters and spaces in arguments

95

- Platform-aware quoting for Windows and Unix systems

96

97

**`findJarsDir(sparkHome: String): String`**

98

- Locates the jars directory within a Spark installation

99

- Handles different Spark distribution layouts

100

- Returns path to directory containing Spark JAR files

101

102

**Usage Examples:**

103

104

**Argument Quoting:**

105

106

```scala

107

import org.apache.spark.launcher.YarnCommandBuilderUtils

108

109

// Quote arguments with spaces or special characters

110

val arg1 = YarnCommandBuilderUtils.quoteForBatchScript("my app name")

111

val arg2 = YarnCommandBuilderUtils.quoteForBatchScript("--conf spark.sql.warehouse.dir=/path/with spaces")

112

val arg3 = YarnCommandBuilderUtils.quoteForBatchScript("value_with_$_symbol")

113

114

println(arg1) // "my app name" (on Unix/Linux)

115

println(arg2) // "--conf spark.sql.warehouse.dir=/path/with spaces"

116

println(arg3) // "value_with_\$_symbol" (escaped special chars)

117

```

118

119

**JAR Directory Discovery:**

120

121

```scala

122

import org.apache.spark.launcher.YarnCommandBuilderUtils

123

124

// Find JAR directory in Spark installation

125

val sparkHome = "/opt/spark"

126

val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)

127

128

println(jarsDir) // /opt/spark/jars (for standard distribution)

129

130

// Use in classpath construction

131

val sparkJars = new File(jarsDir).listFiles()

132

.filter(_.getName.endsWith(".jar"))

133

.map(_.getAbsolutePath)

134

.mkString(":")

135

```

136

137

## Integration with Container Launch

138

139

### Environment Setup

140

141

```scala

142

// Example of environment preparation for executor containers

143

def prepareExecutorEnvironment(

144

sparkHome: String,

145

executorMemory: String,

146

additionalPaths: Seq[String]): HashMap[String, String] = {

147

148

val env = new HashMap[String, String]()

149

150

// Set SPARK_HOME

151

env("SPARK_HOME") = sparkHome

152

153

// Add Spark bins to PATH

154

YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/bin")

155

YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/sbin")

156

157

// Add additional paths

158

additionalPaths.foreach { path =>

159

YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", path)

160

}

161

162

// Set Java library path

163

val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)

164

YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", s"$jarsDir/../lib")

165

166

env

167

}

168

```

169

170

### Command Construction

171

172

```scala

173

// Example of building executor launch command

174

def buildExecutorCommand(

175

sparkHome: String,

176

executorMemory: String,

177

executorCores: Int,

178

userArgs: Seq[String]): Seq[String] = {

179

180

val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)

181

val sparkJars = s"$jarsDir/*"

182

183

val baseCommand = Seq(

184

"java",

185

s"-Xmx$executorMemory",

186

"-cp", sparkJars,

187

"org.apache.spark.executor.YarnCoarseGrainedExecutorBackend"

188

)

189

190

// Quote user arguments that might contain spaces

191

val quotedUserArgs = userArgs.map(YarnCommandBuilderUtils.quoteForBatchScript)

192

193

baseCommand ++ quotedUserArgs

194

}

195

```

196

197

## Error Handling

198

199

### Path Resolution Errors

200

201

```scala

202

try {

203

val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)

204

// Use jarsDir...

205

} catch {

206

case e: IllegalArgumentException =>

207

throw new SparkException(s"Unable to find Spark jars directory in: $sparkHome", e)

208

case e: SecurityException =>

209

throw new SparkException(s"Access denied reading Spark installation: $sparkHome", e)

210

}

211

```

212

213

### Environment Variable Conflicts

214

215

```scala

216

def validateEnvironment(env: HashMap[String, String]): Unit = {

217

// Check for conflicting PATH entries

218

val pathValue = env.getOrElse("PATH", "")

219

if (pathValue.contains("::")) {

220

logWarning("Empty path component detected in PATH environment variable")

221

}

222

223

// Validate SPARK_HOME consistency

224

val sparkHome = env.get("SPARK_HOME")

225

val pathContainsSparkBin = pathValue.contains("/bin") || pathValue.contains("/sbin")

226

227

if (sparkHome.isDefined && !pathContainsSparkBin) {

228

logWarning("SPARK_HOME set but Spark binaries may not be in PATH")

229

}

230

}

231

```

232

233

## Platform Considerations

234

235

### Windows Compatibility

236

237

```scala

238

// Platform-specific behavior in YarnCommandBuilderUtils.quoteForBatchScript

239

def quoteForPlatform(arg: String): String = {

240

val isWindows = System.getProperty("os.name").toLowerCase.contains("windows")

241

242

if (isWindows) {

243

// Windows batch script quoting

244

if (arg.contains(" ") || arg.contains("&") || arg.contains("|")) {

245

s""""$arg""""

246

} else {

247

arg

248

}

249

} else {

250

// Unix shell quoting

251

if (arg.contains(" ") || arg.contains("$") || arg.contains("'")) {

252

s"'${arg.replace("'", "'\\''")}'"

253

} else {

254

arg

255

}

256

}

257

}

258

```

259

260

### Container Resource Constraints

261

262

```scala

263

// Memory overhead calculation considering container limits

264

def calculateOptimalOverhead(

265

executorMemoryMB: Long,

266

nodeMemoryMB: Long,

267

executorsPerNode: Int): Long = {

268

269

val standardOverhead = math.max(

270

(executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,

271

YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)

272

)

273

274

val totalRequestedMemory = (executorMemoryMB + standardOverhead) * executorsPerNode

275

276

if (totalRequestedMemory > nodeMemoryMB * 0.9) {

277

// Reduce overhead if total memory exceeds 90% of node capacity

278

val maxOverhead = (nodeMemoryMB * 0.9 / executorsPerNode - executorMemoryMB).toLong

279

math.max(maxOverhead, YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024))

280

} else {

281

standardOverhead

282

}

283

}

284

```