0
# Application Management
1
2
Core components for submitting and managing Spark applications on YARN clusters. These classes handle application submission, monitoring, lifecycle management, and interaction with the YARN ResourceManager.
3
4
## Capabilities
5
6
### Client
7
8
Primary entry point for submitting and monitoring Spark applications on YARN. Handles application submission to YARN ResourceManager and provides monitoring capabilities.
9
10
```scala { .api }
11
/**
12
* Client for submitting and monitoring Spark applications on YARN
13
* @param args Client arguments containing application details
14
* @param sparkConf Spark configuration
15
* @param rpcEnv RPC environment for communication
16
*/
17
private[spark] class Client(
18
val args: ClientArguments,
19
val sparkConf: SparkConf,
20
val rpcEnv: RpcEnv
21
) {
22
/** Submit application to YARN ResourceManager */
23
def submitApplication(): Unit
24
25
/** Submit and monitor application until completion */
26
def run(): Unit
27
28
/** Stop the client and cleanup resources */
29
def stop(): Unit
30
31
/** Get YARN application ID */
32
def getApplicationId(): ApplicationId
33
34
/** Monitor application status and return report */
35
def monitorApplication(
36
returnOnRunning: Boolean = false,
37
logApplicationReport: Boolean = true,
38
interval: Long = sparkConf.get(REPORT_INTERVAL)
39
): YarnAppReport
40
41
/** Report launcher state for external monitoring */
42
def reportLauncherState(state: SparkAppHandle.State): Unit
43
44
/** Get application report from ResourceManager */
45
def getApplicationReport(): ApplicationReport
46
}
47
```
48
49
**Usage Example:**
50
51
```scala
52
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
53
import org.apache.spark.{SparkConf, SparkContext}
54
import org.apache.spark.rpc.RpcEnv
55
56
val sparkConf = new SparkConf()
57
.setAppName("MySparkApp")
58
.set("spark.yarn.queue", "default")
59
60
val rpcEnv = RpcEnv.create("sparkYarnClient", "localhost", -1, sparkConf,
61
new SecurityManager(sparkConf))
62
63
val clientArgs = new ClientArguments(Array(
64
"--jar", "/path/to/app.jar",
65
"--class", "com.example.MainClass"
66
))
67
68
val client = new Client(clientArgs, sparkConf, rpcEnv)
69
client.submitApplication()
70
val appId = client.getApplicationId()
71
println(s"Application submitted with ID: $appId")
72
```
73
74
### Client Companion Object
75
76
Utility methods and constants for YARN client operations.
77
78
```scala { .api }
79
object Client {
80
/** Application JAR file name in YARN */
81
val APP_JAR_NAME: String = "__app__.jar"
82
83
/** Staging directory name */
84
val SPARK_STAGING: String = ".sparkStaging"
85
86
/** Localized configuration directory name */
87
val LOCALIZED_CONF_DIR: String = "__spark_conf__"
88
89
/** Localized library directory name */
90
val LOCALIZED_LIB_DIR: String = "__spark_libs__"
91
92
/** Localized Python files directory name */
93
val LOCALIZED_PYTHON_DIR: String = "__pyfiles__"
94
95
/**
96
* Get user classpath URIs from Spark configuration
97
* @param conf Spark configuration
98
* @return Array of classpath URIs
99
*/
100
def getUserClasspath(conf: SparkConf): Array[URI]
101
102
/**
103
* Get user classpath URLs
104
* @param conf Spark configuration
105
* @param useClusterPath Whether to use cluster-side paths
106
* @return Array of classpath URLs
107
*/
108
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL]
109
110
/**
111
* Build file system path from components
112
* @param components Path components to join
113
* @return Combined path string
114
*/
115
def buildPath(components: String*): String
116
117
/**
118
* Convert gateway path to cluster path
119
* @param conf Spark configuration
120
* @param path Original path
121
* @return Cluster-side path
122
*/
123
def getClusterPath(conf: SparkConf, path: String): String
124
}
125
```
126
127
### ApplicationMaster
128
129
YARN ApplicationMaster for managing Spark application lifecycle within YARN containers. Coordinates between Spark driver and YARN ResourceManager.
130
131
```scala { .api }
132
/**
133
* YARN ApplicationMaster for managing Spark application lifecycle
134
* @param args ApplicationMaster arguments
135
* @param sparkConf Spark configuration
136
* @param yarnConf YARN configuration
137
*/
138
private[spark] class ApplicationMaster(
139
args: ApplicationMasterArguments,
140
sparkConf: SparkConf,
141
yarnConf: YarnConfiguration
142
) {
143
/** Main execution method for ApplicationMaster */
144
def run(): Int
145
146
/** Run in unmanaged mode (client mode) */
147
def runUnmanaged(
148
clientRpcEnv: RpcEnv,
149
appAttemptId: ApplicationAttemptId,
150
stagingDir: Path,
151
cachedResourcesConf: SparkConf
152
): Unit
153
154
/** Stop unmanaged ApplicationMaster */
155
def stopUnmanaged(stagingDir: Path): Unit
156
157
/** Finish application with final status */
158
def finish(status: FinalApplicationStatus, code: Int, msg: String): Unit
159
160
/** Unregister from ResourceManager */
161
def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
162
}
163
```
164
165
### ApplicationMaster Companion Object
166
167
Static utilities and entry point for ApplicationMaster.
168
169
```scala { .api }
170
object ApplicationMaster {
171
/** Main entry point for ApplicationMaster */
172
def main(args: Array[String]): Unit
173
174
/** Signal SparkContext initialization */
175
def sparkContextInitialized(sc: SparkContext): Unit
176
177
/** Get current application attempt ID */
178
def getAttemptId(): ApplicationAttemptId
179
180
/** Get Spark history server address */
181
def getHistoryServerAddress(
182
sparkConf: SparkConf,
183
yarnConf: YarnConfiguration,
184
appId: String,
185
attemptId: String
186
): String
187
}
188
```
189
190
### YarnClusterApplication
191
192
Application entry point for yarn-cluster mode deployment.
193
194
```scala { .api }
195
/**
196
* Application entry point for yarn-cluster mode
197
*/
198
class YarnClusterApplication extends SparkApplication {
199
/**
200
* Start application in cluster mode
201
* @param args Application arguments
202
* @param conf Spark configuration
203
*/
204
def start(args: Array[String], conf: SparkConf): Unit
205
}
206
```
207
208
**Usage Example:**
209
210
```scala
211
import org.apache.spark.deploy.yarn.YarnClusterApplication
212
import org.apache.spark.SparkConf
213
214
val conf = new SparkConf()
215
.setAppName("MyClusterApp")
216
.set("spark.yarn.queue", "default")
217
218
val app = new YarnClusterApplication()
219
app.start(Array("arg1", "arg2"), conf)
220
```
221
222
### ExecutorLauncher
223
224
Entry point for client mode executor launcher.
225
226
```scala { .api }
227
object ExecutorLauncher {
228
/** Main entry point for executor launcher */
229
def main(args: Array[String]): Unit
230
}
231
```
232
233
## Types
234
235
### YarnAppReport
236
237
Container for YARN application status information.
238
239
```scala { .api }
240
/**
241
* Container for YARN application status information
242
* @param appState Current YARN application state
243
* @param finalState Final application status
244
* @param diagnostics Optional diagnostic messages
245
*/
246
case class YarnAppReport(
247
appState: YarnApplicationState,
248
finalState: FinalApplicationStatus,
249
diagnostics: Option[String]
250
)
251
```
252
253
### Argument Classes
254
255
```scala { .api }
256
/**
257
* Argument parser for YARN client
258
* @param args Command line arguments
259
*/
260
class ClientArguments(args: Array[String]) {
261
/** User application JAR file */
262
var userJar: String
263
264
/** Main class to execute */
265
var userClass: String
266
267
/** Primary Python file for PySpark */
268
var primaryPyFile: String
269
270
/** Primary R file for SparkR */
271
var primaryRFile: String
272
273
/** User application arguments */
274
var userArgs: ArrayBuffer[String]
275
276
/** Enable verbose logging */
277
var verbose: Boolean
278
}
279
280
/**
281
* Argument parser for ApplicationMaster
282
* @param args Command line arguments
283
*/
284
class ApplicationMasterArguments(args: Array[String]) {
285
/** User application JAR file path */
286
var userJar: String
287
288
/** Main class name to execute */
289
var userClass: String
290
291
/** Primary Python file for PySpark */
292
var primaryPyFile: String
293
294
/** Primary R file for SparkR */
295
var primaryRFile: String
296
297
/** User application arguments */
298
var userArgs: Seq[String]
299
300
/** Spark properties file path */
301
var propertiesFile: String
302
303
/** Distributed cache configuration */
304
var distCacheConf: String
305
}
306
```
307
308
## Integration Patterns
309
310
### Client Mode Integration
311
312
In client mode, the Spark driver runs on the local machine (outside YARN), while executors run on YARN.
313
314
```scala
315
// Client mode configuration
316
val conf = new SparkConf()
317
.setMaster("yarn")
318
.setDeployMode("client")
319
.setAppName("ClientModeApp")
320
321
// SparkContext creation automatically uses YARN client mode
322
val sc = new SparkContext(conf)
323
```
324
325
### Cluster Mode Integration
326
327
In cluster mode, both driver and executors run on YARN cluster.
328
329
```scala
330
// Cluster mode configuration
331
val conf = new SparkConf()
332
.setMaster("yarn")
333
.setDeployMode("cluster")
334
.setAppName("ClusterModeApp")
335
336
// For cluster mode, typically submitted via spark-submit
337
// or programmatically via YarnClusterApplication
338
```
339
340
### Programmatic Submission
341
342
```scala
343
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
344
import org.apache.spark.SparkConf
345
import org.apache.spark.rpc.RpcEnv
346
347
def submitApplication(jarPath: String, mainClass: String): ApplicationId = {
348
val conf = new SparkConf()
349
.setAppName("ProgrammaticSubmission")
350
.set("spark.yarn.queue", "default")
351
.set("spark.yarn.am.memory", "1g")
352
353
val rpcEnv = RpcEnv.create("sparkYarnClient", "localhost", -1, conf,
354
new SecurityManager(conf))
355
356
val args = new ClientArguments(Array(
357
"--jar", jarPath,
358
"--class", mainClass
359
))
360
361
val client = new Client(args, conf, rpcEnv)
362
client.submitApplication()
363
client.getApplicationId()
364
}
365
```