0
# Application Master
1
2
The Application Master is the central coordinator for Spark applications running on YARN. It manages the application lifecycle, negotiates resources with the YARN ResourceManager, and coordinates between the Spark driver and executor processes within YARN containers.
3
4
## Capabilities
5
6
### Application Master Core
7
8
The main Application Master class that handles application coordination and resource management within the YARN cluster.
9
10
```scala { .api }
11
/**
12
* Application Master for Spark on YARN
13
* Manages application lifecycle and resource allocation
14
*/
15
private[spark] class ApplicationMaster(
16
args: ApplicationMasterArguments,
17
client: YarnRMClient
18
) extends Logging {
19
20
/**
21
* Main execution method for the Application Master
22
* @return Exit code (0 for success, non-zero for failure)
23
*/
24
final def run(): Int
25
}
26
```
27
28
The Application Master operates in two modes:
29
- **Client Mode**: Acts as executor launcher, driver runs outside YARN
30
- **Cluster Mode**: Includes the driver within the Application Master process
31
32
### Application Master Entry Points
33
34
Main entry points for launching the Application Master and Executor Launcher processes.
35
36
```scala { .api }
37
/**
38
* Entry point for Application Master process
39
* Called by YARN when launching the AM container
40
*/
41
object ApplicationMaster extends Logging {
42
def main(args: Array[String]): Unit
43
44
/**
45
* Notifies the Application Master that SparkContext has been initialized
46
* @param sc The initialized SparkContext
47
*/
48
private[spark] def sparkContextInitialized(sc: SparkContext): Unit
49
50
/**
51
* Notifies the Application Master that SparkContext has been stopped
52
* @param sc The stopped SparkContext
53
*/
54
private[spark] def sparkContextStopped(sc: SparkContext): Unit
55
}
56
57
/**
58
* Entry point for Executor Launcher (client mode)
59
* Launches executors when driver runs outside YARN
60
*/
61
object ExecutorLauncher {
62
def main(args: Array[String]): Unit
63
}
64
```
65
66
**Usage Example:**
67
68
The Application Master is automatically launched by YARN, but the main methods are called as follows:
69
70
```bash
71
# YARN calls Application Master main method
72
java -cp $CLASSPATH org.apache.spark.deploy.yarn.ApplicationMaster \
73
--class com.example.MyApp \
74
--jar myapp.jar \
75
--args arg1,arg2 \
76
--num-executors 4 \
77
--executor-memory 2g
78
79
# For client mode, Executor Launcher is used
80
java -cp $CLASSPATH org.apache.spark.deploy.yarn.ExecutorLauncher \
81
--num-executors 4 \
82
--executor-memory 2g
83
```
84
85
### Application Master Arguments
86
87
Argument parsing and configuration management for Application Master initialization.
88
89
```scala { .api }
90
/**
91
* Parses command-line arguments for Application Master
92
*/
93
class ApplicationMasterArguments(val args: Array[String]) {
94
var userJar: String = null
95
var userClass: String = null
96
var userArgs: Seq[String] = Nil
97
var propertiesFile: String = null
98
var numExecutors: Int = 2
99
var executorMemory: String = "1g"
100
var executorCores: Int = 1
101
}
102
103
object ApplicationMasterArguments {
104
// Utility methods for argument parsing
105
}
106
```
107
108
**Key Arguments:**
109
110
- `--class`: Main class to execute (cluster mode only)
111
- `--jar`: User application JAR file
112
- `--args`: Application arguments (comma-separated)
113
- `--num-executors`: Number of executor containers to request
114
- `--executor-memory`: Memory per executor container
115
- `--executor-cores`: CPU cores per executor container
116
- `--properties-file`: Spark properties file location
117
118
### Resource Management Integration
119
120
The Application Master integrates with YARN's resource management through the YarnRMClient interface.
121
122
```scala { .api }
123
/**
124
* Interface for YARN Resource Manager client operations
125
* Handles resource requests and container management
126
*/
127
trait YarnRMClient {
128
def getAttemptId(): ApplicationAttemptId
129
def getMaxRegAttempts(conf: YarnConfiguration): Int
130
// Additional resource management methods (internal implementation)
131
}
132
```
133
134
### Executor Management
135
136
The Application Master manages executor lifecycle through the YarnAllocator.
137
138
```scala { .api }
139
/**
140
* Handles executor allocation and management
141
* Requests containers from YARN and launches executor processes
142
*/
143
private[spark] class YarnAllocator(/* parameters */) extends Logging {
144
// Internal implementation for:
145
// - Container allocation requests
146
// - Executor process launching
147
// - Container failure handling
148
// - Dynamic scaling based on workload
149
}
150
151
/**
152
* Enumeration for allocation types
153
*/
154
object AllocationType extends Enumeration {
155
// Defines different types of resource allocations
156
}
157
```
158
159
## Lifecycle Management
160
161
### Application Master Lifecycle
162
163
1. **Initialization**: Parse arguments, set up configuration, establish RM connection
164
2. **Registration**: Register with YARN ResourceManager
165
3. **Resource Allocation**: Request executor containers based on configuration
166
4. **Executor Launch**: Start executor processes in allocated containers
167
5. **Application Execution**: Coordinate between driver and executors
168
6. **Cleanup**: Release resources and unregister from ResourceManager
169
170
### Driver Integration
171
172
In cluster mode, the Application Master includes the driver:
173
174
```scala
175
// Cluster mode - driver runs in AM
176
if (isClusterMode) {
177
// Launch user application class within AM process
178
runDriver(securityManager)
179
} else {
180
// Client mode - launch executors only
181
runExecutorLauncher(securityManager)
182
}
183
```
184
185
### Container Management
186
187
The Application Master manages YARN containers for executors:
188
189
- **Container Requests**: Specify memory, CPU, and locality preferences
190
- **Container Allocation**: Handle ResourceManager responses
191
- **Executor Launch**: Start Spark executor processes in containers
192
- **Health Monitoring**: Track container status and handle failures
193
- **Dynamic Scaling**: Add/remove executors based on workload
194
195
## Configuration Integration
196
197
### Spark Configuration
198
199
Key Spark properties that affect Application Master behavior:
200
201
```scala
202
val sparkConf = new SparkConf()
203
.set("spark.yarn.max.executor.failures", "6") // Max executor failures before AM fails
204
.set("spark.yarn.am.memory", "1g") // Application Master memory
205
.set("spark.yarn.am.cores", "1") // Application Master CPU cores
206
.set("spark.yarn.am.waitTime", "100s") // Max wait time for SparkContext
207
.set("spark.yarn.submit.waitAppCompletion", "true") // Wait for app completion
208
```
209
210
### YARN Configuration
211
212
Integration with YARN configuration:
213
214
```scala
215
// YARN configuration integration
216
val yarnConf = new YarnConfiguration(hadoopConf)
217
218
// Security configuration
219
val securityManager = new SecurityManager(sparkConf)
220
221
// FileSystem integration
222
val fs = FileSystem.get(yarnConf)
223
```
224
225
## Error Handling and Recovery
226
227
### Failure Scenarios
228
229
The Application Master handles various failure conditions:
230
231
- **Executor Failures**: Automatic restart up to configured limits
232
- **Driver Failures**: Application termination with proper cleanup
233
- **ResourceManager Communication**: Retry logic for RM interactions
234
- **Container Preemption**: Handle YARN preemption gracefully
235
- **Network Partitions**: Robust handling of connectivity issues
236
237
### Cleanup Operations
238
239
Automatic cleanup includes:
240
241
- **Container Release**: Return unused containers to YARN
242
- **File Cleanup**: Remove staging files from HDFS
243
- **Resource Deallocation**: Clean up allocated resources
244
- **Registration Cleanup**: Unregister from ResourceManager
245
246
### Monitoring and Logging
247
248
The Application Master provides extensive logging and monitoring:
249
250
```scala
251
// Logging integration
252
extends Logging
253
254
// Key log events
255
logInfo("ApplicationAttemptId: " + appAttemptId)
256
logInfo("Registered with ResourceManager")
257
logInfo("Allocating " + numExecutors + " executors")
258
logWarning("Executor failed: " + executorId)
259
logError("Fatal error in Application Master", exception)
260
```
261
262
## Integration with Spark Components
263
264
### Scheduler Integration
265
266
The Application Master coordinates with Spark schedulers:
267
268
- **Task Scheduling**: Interface with TaskScheduler for task placement
269
- **Resource Updates**: Notify scheduler of resource changes
270
- **Locality Preferences**: Honor data locality requirements
271
- **Load Balancing**: Distribute tasks across available executors
272
273
### Security Integration
274
275
Comprehensive security integration:
276
277
- **Kerberos Authentication**: Support for secure clusters
278
- **Delegation Tokens**: Manage HDFS and other service tokens
279
- **Secret Management**: Secure distribution of secrets
280
- **ACL Management**: Application access control lists
281
282
### Storage Integration
283
284
Integration with distributed storage:
285
286
- **HDFS Integration**: Access to Hadoop Distributed File System
287
- **Staging Directory**: Temporary file storage during application lifecycle
288
- **Log Aggregation**: Collection of executor logs after completion