0
# Application Deployment
1
2
Client API for submitting and managing YARN applications programmatically. This module provides comprehensive support for both client and cluster deployment modes, including application lifecycle management and monitoring capabilities.
3
4
## Capabilities
5
6
### Client
7
8
Main client interface for YARN application submission and management. Handles resource staging, application submission, and optional monitoring.
9
10
```scala { .api }
11
class Client(args: ClientArguments, sparkConf: SparkConf) {
12
def submitApplication(): ApplicationId
13
def run(): Unit
14
def stop(): Unit
15
def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport
16
def getApplicationReport(appId: ApplicationId): ApplicationReport
17
def reportLauncherState(state: SparkAppHandle.State): Unit
18
def cleanupStagingDir(appId: ApplicationId): Unit
19
def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext
20
def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]
21
def copyFileToRemote(destDir: Path, srcPath: Path, replication: Short, symlinkCache: HashMap[String, Path] = null): Path
22
def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit
23
}
24
```
25
26
**Constructor Parameters:**
27
- `args`: ClientArguments containing application jar, main class, and program arguments
28
- `sparkConf`: SparkConf with YARN-specific configuration settings
29
30
**Core Methods:**
31
32
**`submitApplication(): ApplicationId`**
33
- Submits application to YARN ResourceManager
34
- Stages resources and creates ApplicationMaster
35
- Returns YARN ApplicationId for tracking
36
37
**`run(): Unit`**
38
- Submits application and optionally monitors execution
39
- Blocks until completion if monitoring is enabled
40
- Combines submitApplication() with optional monitoring
41
42
**`stop(): Unit`**
43
- Stops client and performs cleanup
44
- Cancels any ongoing monitoring
45
- Releases staged resources
46
47
**`monitorApplication(appId, returnOnRunning, logApplicationReport): YarnAppReport`**
48
- Monitors application state changes
49
- `returnOnRunning`: If true, returns when app reaches RUNNING state
50
- `logApplicationReport`: If true, logs periodic status updates
51
- Returns final application report
52
53
**`getApplicationReport(appId: ApplicationId): ApplicationReport`**
54
- Retrieves current YARN application report
55
- Provides state, progress, and diagnostic information
56
- Non-blocking status query
57
58
**Usage Examples:**
59
60
**Basic Application Submission:**
61
62
```scala
63
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
64
import org.apache.spark.SparkConf
65
66
val sparkConf = new SparkConf()
67
.setAppName("MyYarnApp")
68
.set("spark.yarn.queue", "default")
69
.set("spark.executor.memory", "2g")
70
.set("spark.executor.cores", "2")
71
72
val args = Array(
73
"--jar", "/path/to/my-app.jar",
74
"--class", "com.example.MyMainClass",
75
"--arg", "arg1",
76
"--arg", "arg2"
77
)
78
79
val clientArgs = new ClientArguments(args)
80
val client = new Client(clientArgs, sparkConf)
81
82
try {
83
val applicationId = client.submitApplication()
84
println(s"Application submitted with ID: $applicationId")
85
86
// Monitor until completion
87
val finalReport = client.monitorApplication(applicationId, returnOnRunning = false, logApplicationReport = true)
88
println(s"Application finished with state: ${finalReport.appState}")
89
} finally {
90
client.stop()
91
}
92
```
93
94
**Submit and Monitor Separately:**
95
96
```scala
97
import org.apache.hadoop.yarn.api.records.YarnApplicationState
98
99
val client = new Client(clientArgs, sparkConf)
100
101
// Submit application
102
val appId = client.submitApplication()
103
104
// Monitor periodically
105
var continue = true
106
while (continue) {
107
val report = client.getApplicationReport(appId)
108
println(s"App state: ${report.getYarnApplicationState}")
109
110
report.getYarnApplicationState match {
111
case YarnApplicationState.FINISHED |
112
YarnApplicationState.FAILED |
113
YarnApplicationState.KILLED => continue = false
114
case _ =>
115
Thread.sleep(5000)
116
continue = true
117
}
118
}
119
120
client.stop()
121
```
122
123
### ClientArguments
124
125
Command-line argument parser for YARN client configuration.
126
127
```scala { .api }
128
class ClientArguments(args: Array[String]) {
129
var userJar: String = null
130
var userClass: String = null
131
var primaryPyFile: String = null
132
var pyFiles: String = null
133
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
134
var propertiesFile: String = null
135
}
136
```
137
138
**Key Fields:**
139
- `userJar`: Path to application JAR file
140
- `userClass`: Main class name to execute
141
- `primaryPyFile`: Primary Python file for PySpark applications
142
- `pyFiles`: Additional Python files (comma-separated)
143
- `userArgs`: Arguments to pass to user application
144
- `propertiesFile`: Additional Spark properties file
145
146
**Supported Arguments:**
147
148
```scala
149
// JAR and class specification
150
--jar /path/to/app.jar
151
--class com.example.MainClass
152
153
// Python applications
154
--primary-py-file /path/to/main.py
155
--py-files file1.py,file2.py
156
157
// Application arguments
158
--arg value1
159
--arg value2
160
161
// Additional configuration
162
--properties-file /path/to/spark.properties
163
```
164
165
**Usage Example:**
166
167
```scala
168
val args = Array(
169
"--jar", "/path/to/analytics.jar",
170
"--class", "com.company.analytics.DataProcessor",
171
"--arg", "input-path=/data/input",
172
"--arg", "output-path=/data/output",
173
"--arg", "date=2024-01-01",
174
"--properties-file", "/etc/spark/analytics.properties"
175
)
176
177
val clientArgs = new ClientArguments(args)
178
println(s"Main class: ${clientArgs.userClass}")
179
println(s"Arguments: ${clientArgs.userArgs.mkString(", ")}")
180
```
181
182
### ApplicationMaster
183
184
YARN ApplicationMaster implementation that manages Spark applications in both cluster and client modes.
185
186
```scala { .api }
187
class ApplicationMaster(args: ApplicationMasterArguments) {
188
def getAttemptId(): ApplicationAttemptId
189
final def run(): Int
190
final def getDefaultFinalStatus(): FinalApplicationStatus
191
final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit
192
final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit
193
// Internal implementation - handles driver execution (cluster mode) or coordination (client mode)
194
}
195
196
object ApplicationMaster {
197
def main(args: Array[String]): Unit
198
private[spark] def sparkContextInitialized(sc: SparkContext): Unit
199
private[spark] def getAttemptId(): ApplicationAttemptId
200
private[spark] def getHistoryServerAddress(sparkConf: SparkConf, yarnConf: YarnConfiguration, appId: ApplicationId, attemptId: ApplicationAttemptId): String
201
}
202
```
203
204
**Core Methods:**
205
206
**`getAttemptId(): ApplicationAttemptId`**
207
- Returns current YARN application attempt identifier
208
- Used for tracking and logging purposes
209
210
**`run(): Int`**
211
- Main execution method for ApplicationMaster
212
- Returns exit code (0 for success, non-zero for failure)
213
- Handles both cluster and client mode execution paths
214
215
**`getDefaultFinalStatus(): FinalApplicationStatus`**
216
- Determines default final status based on execution outcome
217
- Returns SUCCEEDED, FAILED, or KILLED status
218
219
**`unregister(status, diagnostics): Unit`**
220
- Unregisters ApplicationMaster from YARN ResourceManager
221
- Reports final status and diagnostic information
222
- Performs cleanup operations
223
224
**`finish(status, code, msg): Unit`**
225
- Finalizes ApplicationMaster execution
226
- Sets exit code and status message
227
- Initiates shutdown procedures
228
229
**Companion Object Methods:**
230
231
**`main(args: Array[String]): Unit`**
232
- Entry point for ApplicationMaster execution
233
- Parses command line arguments and starts ApplicationMaster
234
235
**`sparkContextInitialized(sc: SparkContext): Unit`**
236
- Called when SparkContext is initialized in cluster mode
237
- Sets up application-specific configuration and monitoring
238
239
**Responsibilities:**
240
- **Cluster Mode**: Runs the Spark driver within the ApplicationMaster
241
- **Client Mode**: Coordinates with external driver and manages executors
242
- **Resource Management**: Requests and manages executor containers
243
- **Security**: Handles credential renewal and security context
244
- **Monitoring**: Reports application progress to YARN ResourceManager
245
246
**ApplicationMasterArguments:**
247
248
```scala { .api }
249
class ApplicationMasterArguments(args: Array[String]) {
250
var userJar: String = null
251
var userClass: String = null
252
var primaryPyFile: String = null
253
var userArgs: String = null
254
var propertiesFile: String = null
255
// Additional AM-specific arguments
256
}
257
```
258
259
### Resource Staging
260
261
Internal resource staging functionality for preparing application files.
262
263
```scala { .api }
264
// Key staging methods (internal to Client)
265
def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]
266
def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext
267
```
268
269
**Staging Process:**
270
1. Creates staging directory in HDFS/distributed filesystem
271
2. Uploads application JAR and dependencies
272
3. Prepares executor launch scripts and configuration
273
4. Sets appropriate file permissions and visibility
274
5. Creates LocalResource references for YARN
275
276
### Application Monitoring
277
278
Application state tracking and reporting functionality.
279
280
```scala { .api }
281
case class YarnAppReport(
282
appState: YarnApplicationState,
283
finalState: FinalApplicationStatus,
284
diagnostics: String
285
)
286
```
287
288
**Application States:**
289
- `NEW`: Application submitted but not yet accepted
290
- `SUBMITTED`: Application accepted by ResourceManager
291
- `RUNNING`: ApplicationMaster started and running
292
- `FINISHED`: Application completed successfully
293
- `FAILED`: Application failed with errors
294
- `KILLED`: Application terminated by user or system
295
296
**Final States:**
297
- `SUCCEEDED`: Application completed successfully
298
- `FAILED`: Application failed
299
- `KILLED`: Application was terminated
300
- `UNDEFINED`: Final state not yet determined
301
302
**Usage Example:**
303
304
```scala
305
def waitForCompletion(client: Client, appId: ApplicationId): Unit = {
306
var finalReport: YarnAppReport = null
307
308
do {
309
Thread.sleep(2000)
310
finalReport = client.monitorApplication(appId, returnOnRunning = false, logApplicationReport = true)
311
312
finalReport.appState match {
313
case YarnApplicationState.RUNNING =>
314
println("Application is running...")
315
case YarnApplicationState.FINISHED =>
316
println(s"Application completed: ${finalReport.finalState}")
317
case YarnApplicationState.FAILED =>
318
println(s"Application failed: ${finalReport.diagnostics}")
319
case YarnApplicationState.KILLED =>
320
println("Application was killed")
321
case _ =>
322
println(s"Application state: ${finalReport.appState}")
323
}
324
} while (!finalReport.appState.toString.matches("FINISHED|FAILED|KILLED"))
325
}
326
```
327
328
## Client Constants
329
330
```scala { .api }
331
object Client {
332
// Constants
333
val APP_JAR_NAME: String = "__app__.jar"
334
val LOCAL_SCHEME: String = "local"
335
val SPARK_STAGING: String = ".sparkStaging"
336
val ENV_DIST_CLASSPATH: String = "SPARK_DIST_CLASSPATH"
337
338
// File permissions
339
val STAGING_DIR_PERMISSION: FsPermission
340
val APP_FILE_PERMISSION: FsPermission
341
342
// Utility methods
343
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit
344
def populateClasspath(conf: SparkConf, hadoopConf: Configuration, env: HashMap[String, String], extraClassPath: Option[String]): Unit
345
def getUserClasspath(conf: SparkConf): Array[URI]
346
def getClusterPath(conf: SparkConf, path: String): String
347
def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean
348
def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean
349
def buildPath(components: String*): String
350
def isLocalUri(uri: String): Boolean
351
def createAppReport(report: ApplicationReport): YarnAppReport
352
def createLibraryPathPrefix(libpath: String, conf: SparkConf): String
353
}
354
```
355
356
**Key Constants:**
357
- `APP_JAR_NAME`: Standard name for uploaded application JAR
358
- `LOCAL_SCHEME`: URI scheme for local filesystem resources
359
- `SPARK_STAGING`: Default staging directory name in distributed filesystem
360
361
## Error Handling
362
363
Common exceptions during application deployment:
364
365
```scala
366
// Application submission failures
367
throw new IOException("Failed to upload application resources")
368
throw new YarnException("ResourceManager rejected application")
369
370
// Invalid arguments
371
throw new IllegalArgumentException("Missing required argument: --jar")
372
throw new SparkException("Cannot specify both --jar and --primary-py-file")
373
374
// Resource staging failures
375
throw new IOException("Failed to create staging directory")
376
throw new AccessControlException("Insufficient permissions for staging")
377
```
378
379
## Integration Patterns
380
381
**Configuration-driven Deployment:**
382
383
```scala
384
val sparkConf = new SparkConf()
385
.setAppName("DataPipeline")
386
.set("spark.yarn.queue", "analytics")
387
.set("spark.yarn.maxAppAttempts", "3")
388
.set("spark.dynamicAllocation.enabled", "true")
389
.set("spark.dynamicAllocation.minExecutors", "2")
390
.set("spark.dynamicAllocation.maxExecutors", "100")
391
392
val client = new Client(clientArgs, sparkConf)
393
val appId = client.submitApplication()
394
```
395
396
**Custom Resource Management:**
397
398
```scala
399
val sparkConf = new SparkConf()
400
.set("spark.executor.memory", "8g")
401
.set("spark.executor.cores", "4")
402
.set("spark.executor.instances", "10")
403
.set("spark.yarn.executor.memoryOverhead", "1g")
404
.set("spark.yarn.am.memory", "2g")
405
.set("spark.yarn.am.cores", "2")
406
```
407
408
## Deploy Mode Considerations
409
410
| Aspect | Client Mode | Cluster Mode |
411
|--------|-------------|--------------|
412
| **Driver Location** | Client machine | YARN cluster |
413
| **Network Access** | Driver needs cluster access | Self-contained |
414
| **Resource Usage** | Client machine resources | Cluster resources only |
415
| **Failure Handling** | Client failure kills app | More resilient |
416
| **Interactive Use** | Suitable for shells/notebooks | Batch processing |