or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md

application-deployment.mddocs/

0

# Application Deployment

1

2

Client API for submitting and managing YARN applications programmatically. This module provides comprehensive support for both client and cluster deployment modes, including application lifecycle management and monitoring capabilities.

3

4

## Capabilities

5

6

### Client

7

8

Main client interface for YARN application submission and management. Handles resource staging, application submission, and optional monitoring.

9

10

```scala { .api }

11

class Client(args: ClientArguments, sparkConf: SparkConf) {

12

def submitApplication(): ApplicationId

13

def run(): Unit

14

def stop(): Unit

15

def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport

16

def getApplicationReport(appId: ApplicationId): ApplicationReport

17

def reportLauncherState(state: SparkAppHandle.State): Unit

18

def cleanupStagingDir(appId: ApplicationId): Unit

19

def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext

20

def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]

21

def copyFileToRemote(destDir: Path, srcPath: Path, replication: Short, symlinkCache: HashMap[String, Path] = null): Path

22

def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit

23

}

24

```

25

26

**Constructor Parameters:**

27

- `args`: ClientArguments containing application jar, main class, and program arguments

28

- `sparkConf`: SparkConf with YARN-specific configuration settings

29

30

**Core Methods:**

31

32

**`submitApplication(): ApplicationId`**

33

- Submits application to YARN ResourceManager

34

- Stages resources and creates ApplicationMaster

35

- Returns YARN ApplicationId for tracking

36

37

**`run(): Unit`**

38

- Submits application and optionally monitors execution

39

- Blocks until completion if monitoring is enabled

40

- Combines submitApplication() with optional monitoring

41

42

**`stop(): Unit`**

43

- Stops client and performs cleanup

44

- Cancels any ongoing monitoring

45

- Releases staged resources

46

47

**`monitorApplication(appId, returnOnRunning, logApplicationReport): YarnAppReport`**

48

- Monitors application state changes

49

- `returnOnRunning`: If true, returns when app reaches RUNNING state

50

- `logApplicationReport`: If true, logs periodic status updates

51

- Returns final application report

52

53

**`getApplicationReport(appId: ApplicationId): ApplicationReport`**

54

- Retrieves current YARN application report

55

- Provides state, progress, and diagnostic information

56

- Non-blocking status query

57

58

**Usage Examples:**

59

60

**Basic Application Submission:**

61

62

```scala

63

import org.apache.spark.deploy.yarn.{Client, ClientArguments}

64

import org.apache.spark.SparkConf

65

66

val sparkConf = new SparkConf()

67

.setAppName("MyYarnApp")

68

.set("spark.yarn.queue", "default")

69

.set("spark.executor.memory", "2g")

70

.set("spark.executor.cores", "2")

71

72

val args = Array(

73

"--jar", "/path/to/my-app.jar",

74

"--class", "com.example.MyMainClass",

75

"--arg", "arg1",

76

"--arg", "arg2"

77

)

78

79

val clientArgs = new ClientArguments(args)

80

val client = new Client(clientArgs, sparkConf)

81

82

try {

83

val applicationId = client.submitApplication()

84

println(s"Application submitted with ID: $applicationId")

85

86

// Monitor until completion

87

val finalReport = client.monitorApplication(applicationId, returnOnRunning = false, logApplicationReport = true)

88

println(s"Application finished with state: ${finalReport.appState}")

89

} finally {

90

client.stop()

91

}

92

```

93

94

**Submit and Monitor Separately:**

95

96

```scala

97

import org.apache.hadoop.yarn.api.records.YarnApplicationState

98

99

val client = new Client(clientArgs, sparkConf)

100

101

// Submit application

102

val appId = client.submitApplication()

103

104

// Monitor periodically

105

var continue = true

106

while (continue) {

107

val report = client.getApplicationReport(appId)

108

println(s"App state: ${report.getYarnApplicationState}")

109

110

report.getYarnApplicationState match {

111

case YarnApplicationState.FINISHED |

112

YarnApplicationState.FAILED |

113

YarnApplicationState.KILLED => continue = false

114

case _ =>

115

Thread.sleep(5000)

116

continue = true

117

}

118

}

119

120

client.stop()

121

```

122

123

### ClientArguments

124

125

Command-line argument parser for YARN client configuration.

126

127

```scala { .api }

128

class ClientArguments(args: Array[String]) {

129

var userJar: String = null

130

var userClass: String = null

131

var primaryPyFile: String = null

132

var pyFiles: String = null

133

var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

134

var propertiesFile: String = null

135

}

136

```

137

138

**Key Fields:**

139

- `userJar`: Path to application JAR file

140

- `userClass`: Main class name to execute

141

- `primaryPyFile`: Primary Python file for PySpark applications

142

- `pyFiles`: Additional Python files (comma-separated)

143

- `userArgs`: Arguments to pass to user application

144

- `propertiesFile`: Additional Spark properties file

145

146

**Supported Arguments:**

147

148

```scala

149

// JAR and class specification

150

--jar /path/to/app.jar

151

--class com.example.MainClass

152

153

// Python applications

154

--primary-py-file /path/to/main.py

155

--py-files file1.py,file2.py

156

157

// Application arguments

158

--arg value1

159

--arg value2

160

161

// Additional configuration

162

--properties-file /path/to/spark.properties

163

```

164

165

**Usage Example:**

166

167

```scala

168

val args = Array(

169

"--jar", "/path/to/analytics.jar",

170

"--class", "com.company.analytics.DataProcessor",

171

"--arg", "input-path=/data/input",

172

"--arg", "output-path=/data/output",

173

"--arg", "date=2024-01-01",

174

"--properties-file", "/etc/spark/analytics.properties"

175

)

176

177

val clientArgs = new ClientArguments(args)

178

println(s"Main class: ${clientArgs.userClass}")

179

println(s"Arguments: ${clientArgs.userArgs.mkString(", ")}")

180

```

181

182

### ApplicationMaster

183

184

YARN ApplicationMaster implementation that manages Spark applications in both cluster and client modes.

185

186

```scala { .api }

187

class ApplicationMaster(args: ApplicationMasterArguments) {

188

def getAttemptId(): ApplicationAttemptId

189

final def run(): Int

190

final def getDefaultFinalStatus(): FinalApplicationStatus

191

final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit

192

final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit

193

// Internal implementation - handles driver execution (cluster mode) or coordination (client mode)

194

}

195

196

object ApplicationMaster {

197

def main(args: Array[String]): Unit

198

private[spark] def sparkContextInitialized(sc: SparkContext): Unit

199

private[spark] def getAttemptId(): ApplicationAttemptId

200

private[spark] def getHistoryServerAddress(sparkConf: SparkConf, yarnConf: YarnConfiguration, appId: ApplicationId, attemptId: ApplicationAttemptId): String

201

}

202

```

203

204

**Core Methods:**

205

206

**`getAttemptId(): ApplicationAttemptId`**

207

- Returns current YARN application attempt identifier

208

- Used for tracking and logging purposes

209

210

**`run(): Int`**

211

- Main execution method for ApplicationMaster

212

- Returns exit code (0 for success, non-zero for failure)

213

- Handles both cluster and client mode execution paths

214

215

**`getDefaultFinalStatus(): FinalApplicationStatus`**

216

- Determines default final status based on execution outcome

217

- Returns SUCCEEDED, FAILED, or KILLED status

218

219

**`unregister(status, diagnostics): Unit`**

220

- Unregisters ApplicationMaster from YARN ResourceManager

221

- Reports final status and diagnostic information

222

- Performs cleanup operations

223

224

**`finish(status, code, msg): Unit`**

225

- Finalizes ApplicationMaster execution

226

- Sets exit code and status message

227

- Initiates shutdown procedures

228

229

**Companion Object Methods:**

230

231

**`main(args: Array[String]): Unit`**

232

- Entry point for ApplicationMaster execution

233

- Parses command line arguments and starts ApplicationMaster

234

235

**`sparkContextInitialized(sc: SparkContext): Unit`**

236

- Called when SparkContext is initialized in cluster mode

237

- Sets up application-specific configuration and monitoring

238

239

**Responsibilities:**

240

- **Cluster Mode**: Runs the Spark driver within the ApplicationMaster

241

- **Client Mode**: Coordinates with external driver and manages executors

242

- **Resource Management**: Requests and manages executor containers

243

- **Security**: Handles credential renewal and security context

244

- **Monitoring**: Reports application progress to YARN ResourceManager

245

246

**ApplicationMasterArguments:**

247

248

```scala { .api }

249

class ApplicationMasterArguments(args: Array[String]) {

250

var userJar: String = null

251

var userClass: String = null

252

var primaryPyFile: String = null

253

var userArgs: String = null

254

var propertiesFile: String = null

255

// Additional AM-specific arguments

256

}

257

```

258

259

### Resource Staging

260

261

Internal resource staging functionality for preparing application files.

262

263

```scala { .api }

264

// Key staging methods (internal to Client)

265

def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]

266

def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext

267

```

268

269

**Staging Process:**

270

1. Creates staging directory in HDFS/distributed filesystem

271

2. Uploads application JAR and dependencies

272

3. Prepares executor launch scripts and configuration

273

4. Sets appropriate file permissions and visibility

274

5. Creates LocalResource references for YARN

275

276

### Application Monitoring

277

278

Application state tracking and reporting functionality.

279

280

```scala { .api }

281

case class YarnAppReport(

282

appState: YarnApplicationState,

283

finalState: FinalApplicationStatus,

284

diagnostics: String

285

)

286

```

287

288

**Application States:**

289

- `NEW`: Application submitted but not yet accepted

290

- `SUBMITTED`: Application accepted by ResourceManager

291

- `RUNNING`: ApplicationMaster started and running

292

- `FINISHED`: Application completed successfully

293

- `FAILED`: Application failed with errors

294

- `KILLED`: Application terminated by user or system

295

296

**Final States:**

297

- `SUCCEEDED`: Application completed successfully

298

- `FAILED`: Application failed

299

- `KILLED`: Application was terminated

300

- `UNDEFINED`: Final state not yet determined

301

302

**Usage Example:**

303

304

```scala

305

def waitForCompletion(client: Client, appId: ApplicationId): Unit = {

306

var finalReport: YarnAppReport = null

307

308

do {

309

Thread.sleep(2000)

310

finalReport = client.monitorApplication(appId, returnOnRunning = false, logApplicationReport = true)

311

312

finalReport.appState match {

313

case YarnApplicationState.RUNNING =>

314

println("Application is running...")

315

case YarnApplicationState.FINISHED =>

316

println(s"Application completed: ${finalReport.finalState}")

317

case YarnApplicationState.FAILED =>

318

println(s"Application failed: ${finalReport.diagnostics}")

319

case YarnApplicationState.KILLED =>

320

println("Application was killed")

321

case _ =>

322

println(s"Application state: ${finalReport.appState}")

323

}

324

} while (!finalReport.appState.toString.matches("FINISHED|FAILED|KILLED"))

325

}

326

```

327

328

## Client Constants

329

330

```scala { .api }

331

object Client {

332

// Constants

333

val APP_JAR_NAME: String = "__app__.jar"

334

val LOCAL_SCHEME: String = "local"

335

val SPARK_STAGING: String = ".sparkStaging"

336

val ENV_DIST_CLASSPATH: String = "SPARK_DIST_CLASSPATH"

337

338

// File permissions

339

val STAGING_DIR_PERMISSION: FsPermission

340

val APP_FILE_PERMISSION: FsPermission

341

342

// Utility methods

343

def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit

344

def populateClasspath(conf: SparkConf, hadoopConf: Configuration, env: HashMap[String, String], extraClassPath: Option[String]): Unit

345

def getUserClasspath(conf: SparkConf): Array[URI]

346

def getClusterPath(conf: SparkConf, path: String): String

347

def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean

348

def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean

349

def buildPath(components: String*): String

350

def isLocalUri(uri: String): Boolean

351

def createAppReport(report: ApplicationReport): YarnAppReport

352

def createLibraryPathPrefix(libpath: String, conf: SparkConf): String

353

}

354

```

355

356

**Key Constants:**

357

- `APP_JAR_NAME`: Standard name for uploaded application JAR

358

- `LOCAL_SCHEME`: URI scheme for local filesystem resources

359

- `SPARK_STAGING`: Default staging directory name in distributed filesystem

360

361

## Error Handling

362

363

Common exceptions during application deployment:

364

365

```scala

366

// Application submission failures

367

throw new IOException("Failed to upload application resources")

368

throw new YarnException("ResourceManager rejected application")

369

370

// Invalid arguments

371

throw new IllegalArgumentException("Missing required argument: --jar")

372

throw new SparkException("Cannot specify both --jar and --primary-py-file")

373

374

// Resource staging failures

375

throw new IOException("Failed to create staging directory")

376

throw new AccessControlException("Insufficient permissions for staging")

377

```

378

379

## Integration Patterns

380

381

**Configuration-driven Deployment:**

382

383

```scala

384

val sparkConf = new SparkConf()

385

.setAppName("DataPipeline")

386

.set("spark.yarn.queue", "analytics")

387

.set("spark.yarn.maxAppAttempts", "3")

388

.set("spark.dynamicAllocation.enabled", "true")

389

.set("spark.dynamicAllocation.minExecutors", "2")

390

.set("spark.dynamicAllocation.maxExecutors", "100")

391

392

val client = new Client(clientArgs, sparkConf)

393

val appId = client.submitApplication()

394

```

395

396

**Custom Resource Management:**

397

398

```scala

399

val sparkConf = new SparkConf()

400

.set("spark.executor.memory", "8g")

401

.set("spark.executor.cores", "4")

402

.set("spark.executor.instances", "10")

403

.set("spark.yarn.executor.memoryOverhead", "1g")

404

.set("spark.yarn.am.memory", "2g")

405

.set("spark.yarn.am.cores", "2")

406

```

407

408

## Deploy Mode Considerations

409

410

| Aspect | Client Mode | Cluster Mode |

411

|--------|-------------|--------------|

412

| **Driver Location** | Client machine | YARN cluster |

413

| **Network Access** | Driver needs cluster access | Self-contained |

414

| **Resource Usage** | Client machine resources | Cluster resources only |

415

| **Failure Handling** | Client failure kills app | More resilient |

416

| **Interactive Use** | Suitable for shells/notebooks | Batch processing |