or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-management.mdconfiguration.mdindex.mdresource-management.mdscheduler-integration.md

application-management.mddocs/

0

# Application Management

1

2

Core components for submitting and managing Spark applications on YARN clusters. These classes handle application submission, monitoring, lifecycle management, and interaction with the YARN ResourceManager.

3

4

## Capabilities

5

6

### Client

7

8

Primary entry point for submitting and monitoring Spark applications on YARN. Handles application submission to YARN ResourceManager and provides monitoring capabilities.

9

10

```scala { .api }

11

/**

12

* Client for submitting and monitoring Spark applications on YARN

13

* @param args Client arguments containing application details

14

* @param sparkConf Spark configuration

15

* @param rpcEnv RPC environment for communication

16

*/

17

private[spark] class Client(

18

val args: ClientArguments,

19

val sparkConf: SparkConf,

20

val rpcEnv: RpcEnv

21

) {

22

/** Submit application to YARN ResourceManager */

23

def submitApplication(): Unit

24

25

/** Submit and monitor application until completion */

26

def run(): Unit

27

28

/** Stop the client and cleanup resources */

29

def stop(): Unit

30

31

/** Get YARN application ID */

32

def getApplicationId(): ApplicationId

33

34

/** Monitor application status and return report */

35

def monitorApplication(

36

returnOnRunning: Boolean = false,

37

logApplicationReport: Boolean = true,

38

interval: Long = sparkConf.get(REPORT_INTERVAL)

39

): YarnAppReport

40

41

/** Report launcher state for external monitoring */

42

def reportLauncherState(state: SparkAppHandle.State): Unit

43

44

/** Get application report from ResourceManager */

45

def getApplicationReport(): ApplicationReport

46

}

47

```

48

49

**Usage Example:**

50

51

```scala

52

import org.apache.spark.deploy.yarn.{Client, ClientArguments}

53

import org.apache.spark.{SparkConf, SparkContext}

54

import org.apache.spark.rpc.RpcEnv

55

56

val sparkConf = new SparkConf()

57

.setAppName("MySparkApp")

58

.set("spark.yarn.queue", "default")

59

60

val rpcEnv = RpcEnv.create("sparkYarnClient", "localhost", -1, sparkConf,

61

new SecurityManager(sparkConf))

62

63

val clientArgs = new ClientArguments(Array(

64

"--jar", "/path/to/app.jar",

65

"--class", "com.example.MainClass"

66

))

67

68

val client = new Client(clientArgs, sparkConf, rpcEnv)

69

client.submitApplication()

70

val appId = client.getApplicationId()

71

println(s"Application submitted with ID: $appId")

72

```

73

74

### Client Companion Object

75

76

Utility methods and constants for YARN client operations.

77

78

```scala { .api }

79

object Client {

80

/** Application JAR file name in YARN */

81

val APP_JAR_NAME: String = "__app__.jar"

82

83

/** Staging directory name */

84

val SPARK_STAGING: String = ".sparkStaging"

85

86

/** Localized configuration directory name */

87

val LOCALIZED_CONF_DIR: String = "__spark_conf__"

88

89

/** Localized library directory name */

90

val LOCALIZED_LIB_DIR: String = "__spark_libs__"

91

92

/** Localized Python files directory name */

93

val LOCALIZED_PYTHON_DIR: String = "__pyfiles__"

94

95

/**

96

* Get user classpath URIs from Spark configuration

97

* @param conf Spark configuration

98

* @return Array of classpath URIs

99

*/

100

def getUserClasspath(conf: SparkConf): Array[URI]

101

102

/**

103

* Get user classpath URLs

104

* @param conf Spark configuration

105

* @param useClusterPath Whether to use cluster-side paths

106

* @return Array of classpath URLs

107

*/

108

def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL]

109

110

/**

111

* Build file system path from components

112

* @param components Path components to join

113

* @return Combined path string

114

*/

115

def buildPath(components: String*): String

116

117

/**

118

* Convert gateway path to cluster path

119

* @param conf Spark configuration

120

* @param path Original path

121

* @return Cluster-side path

122

*/

123

def getClusterPath(conf: SparkConf, path: String): String

124

}

125

```

126

127

### ApplicationMaster

128

129

YARN ApplicationMaster for managing Spark application lifecycle within YARN containers. Coordinates between Spark driver and YARN ResourceManager.

130

131

```scala { .api }

132

/**

133

* YARN ApplicationMaster for managing Spark application lifecycle

134

* @param args ApplicationMaster arguments

135

* @param sparkConf Spark configuration

136

* @param yarnConf YARN configuration

137

*/

138

private[spark] class ApplicationMaster(

139

args: ApplicationMasterArguments,

140

sparkConf: SparkConf,

141

yarnConf: YarnConfiguration

142

) {

143

/** Main execution method for ApplicationMaster */

144

def run(): Int

145

146

/** Run in unmanaged mode (client mode) */

147

def runUnmanaged(

148

clientRpcEnv: RpcEnv,

149

appAttemptId: ApplicationAttemptId,

150

stagingDir: Path,

151

cachedResourcesConf: SparkConf

152

): Unit

153

154

/** Stop unmanaged ApplicationMaster */

155

def stopUnmanaged(stagingDir: Path): Unit

156

157

/** Finish application with final status */

158

def finish(status: FinalApplicationStatus, code: Int, msg: String): Unit

159

160

/** Unregister from ResourceManager */

161

def unregister(status: FinalApplicationStatus, diagnostics: String): Unit

162

}

163

```

164

165

### ApplicationMaster Companion Object

166

167

Static utilities and entry point for ApplicationMaster.

168

169

```scala { .api }

170

object ApplicationMaster {

171

/** Main entry point for ApplicationMaster */

172

def main(args: Array[String]): Unit

173

174

/** Signal SparkContext initialization */

175

def sparkContextInitialized(sc: SparkContext): Unit

176

177

/** Get current application attempt ID */

178

def getAttemptId(): ApplicationAttemptId

179

180

/** Get Spark history server address */

181

def getHistoryServerAddress(

182

sparkConf: SparkConf,

183

yarnConf: YarnConfiguration,

184

appId: String,

185

attemptId: String

186

): String

187

}

188

```

189

190

### YarnClusterApplication

191

192

Application entry point for yarn-cluster mode deployment.

193

194

```scala { .api }

195

/**

196

* Application entry point for yarn-cluster mode

197

*/

198

class YarnClusterApplication extends SparkApplication {

199

/**

200

* Start application in cluster mode

201

* @param args Application arguments

202

* @param conf Spark configuration

203

*/

204

def start(args: Array[String], conf: SparkConf): Unit

205

}

206

```

207

208

**Usage Example:**

209

210

```scala

211

import org.apache.spark.deploy.yarn.YarnClusterApplication

212

import org.apache.spark.SparkConf

213

214

val conf = new SparkConf()

215

.setAppName("MyClusterApp")

216

.set("spark.yarn.queue", "default")

217

218

val app = new YarnClusterApplication()

219

app.start(Array("arg1", "arg2"), conf)

220

```

221

222

### ExecutorLauncher

223

224

Entry point for client mode executor launcher.

225

226

```scala { .api }

227

object ExecutorLauncher {

228

/** Main entry point for executor launcher */

229

def main(args: Array[String]): Unit

230

}

231

```

232

233

## Types

234

235

### YarnAppReport

236

237

Container for YARN application status information.

238

239

```scala { .api }

240

/**

241

* Container for YARN application status information

242

* @param appState Current YARN application state

243

* @param finalState Final application status

244

* @param diagnostics Optional diagnostic messages

245

*/

246

case class YarnAppReport(

247

appState: YarnApplicationState,

248

finalState: FinalApplicationStatus,

249

diagnostics: Option[String]

250

)

251

```

252

253

### Argument Classes

254

255

```scala { .api }

256

/**

257

* Argument parser for YARN client

258

* @param args Command line arguments

259

*/

260

class ClientArguments(args: Array[String]) {

261

/** User application JAR file */

262

var userJar: String

263

264

/** Main class to execute */

265

var userClass: String

266

267

/** Primary Python file for PySpark */

268

var primaryPyFile: String

269

270

/** Primary R file for SparkR */

271

var primaryRFile: String

272

273

/** User application arguments */

274

var userArgs: ArrayBuffer[String]

275

276

/** Enable verbose logging */

277

var verbose: Boolean

278

}

279

280

/**

281

* Argument parser for ApplicationMaster

282

* @param args Command line arguments

283

*/

284

class ApplicationMasterArguments(args: Array[String]) {

285

/** User application JAR file path */

286

var userJar: String

287

288

/** Main class name to execute */

289

var userClass: String

290

291

/** Primary Python file for PySpark */

292

var primaryPyFile: String

293

294

/** Primary R file for SparkR */

295

var primaryRFile: String

296

297

/** User application arguments */

298

var userArgs: Seq[String]

299

300

/** Spark properties file path */

301

var propertiesFile: String

302

303

/** Distributed cache configuration */

304

var distCacheConf: String

305

}

306

```

307

308

## Integration Patterns

309

310

### Client Mode Integration

311

312

In client mode, the Spark driver runs on the local machine (outside YARN), while executors run on YARN.

313

314

```scala

315

// Client mode configuration

316

val conf = new SparkConf()

317

.setMaster("yarn")

318

.setDeployMode("client")

319

.setAppName("ClientModeApp")

320

321

// SparkContext creation automatically uses YARN client mode

322

val sc = new SparkContext(conf)

323

```

324

325

### Cluster Mode Integration

326

327

In cluster mode, both driver and executors run on YARN cluster.

328

329

```scala

330

// Cluster mode configuration

331

val conf = new SparkConf()

332

.setMaster("yarn")

333

.setDeployMode("cluster")

334

.setAppName("ClusterModeApp")

335

336

// For cluster mode, typically submitted via spark-submit

337

// or programmatically via YarnClusterApplication

338

```

339

340

### Programmatic Submission

341

342

```scala

343

import org.apache.spark.deploy.yarn.{Client, ClientArguments}

344

import org.apache.spark.SparkConf

345

import org.apache.spark.rpc.RpcEnv

346

347

def submitApplication(jarPath: String, mainClass: String): ApplicationId = {

348

val conf = new SparkConf()

349

.setAppName("ProgrammaticSubmission")

350

.set("spark.yarn.queue", "default")

351

.set("spark.yarn.am.memory", "1g")

352

353

val rpcEnv = RpcEnv.create("sparkYarnClient", "localhost", -1, conf,

354

new SecurityManager(conf))

355

356

val args = new ClientArguments(Array(

357

"--jar", jarPath,

358

"--class", mainClass

359

))

360

361

val client = new Client(args, conf, rpcEnv)

362

client.submitApplication()

363

client.getApplicationId()

364

}

365

```