0
# YARN Client
1
2
The YARN Client provides the main entry point for submitting Spark applications to YARN clusters. It handles application packaging, resource allocation requests, and deployment preparation for both alpha and stable Hadoop versions.
3
4
## Imports
5
6
```scala
7
import org.apache.spark.deploy.yarn._
8
import org.apache.spark.{Logging, SparkConf}
9
import org.apache.hadoop.conf.Configuration
10
import org.apache.hadoop.yarn.conf.YarnConfiguration
11
import org.apache.hadoop.yarn.client.api.YarnClient
12
```
13
14
## Capabilities
15
16
### Client Entry Point
17
18
Main entry point for YARN client operations that handles command-line submission of Spark applications to YARN clusters.
19
20
```scala { .api }
21
/**
22
* Entry point for YARN client operations
23
* Handles command-line arguments and submits applications to YARN
24
*/
25
object Client {
26
def main(argStrings: Array[String]): Unit
27
}
28
```
29
30
**Usage Example:**
31
32
```scala
33
// Programmatic usage (internal to Spark)
34
Client.main(Array(
35
"--jar", "myapp.jar",
36
"--class", "com.example.MyApp",
37
"--arg", "input.txt",
38
"--arg", "output.txt",
39
"--num-executors", "4",
40
"--executor-memory", "2g"
41
))
42
```
43
44
The client is typically invoked through `spark-submit` rather than directly:
45
46
```bash
47
# Client automatically invoked by spark-submit
48
spark-submit --master yarn-client \
49
--num-executors 4 \
50
--executor-memory 2g \
51
--class com.example.MyApp \
52
myapp.jar input.txt output.txt
53
```
54
55
### Client Implementation Classes
56
57
The actual client implementation varies by Hadoop version, with both alpha and stable versions providing the same interface.
58
59
#### Stable Version Client
60
61
```scala { .api }
62
/**
63
* YARN client implementation for Hadoop 2.2+
64
* Handles application submission to stable YARN API
65
*/
66
private[spark] class Client(
67
val args: ClientArguments,
68
val hadoopConf: Configuration,
69
val sparkConf: SparkConf
70
) extends ClientBase with Logging {
71
72
def this(clientArgs: ClientArguments, spConf: SparkConf)
73
def this(clientArgs: ClientArguments)
74
75
val yarnClient: YarnClient
76
val yarnConf: YarnConfiguration
77
78
def stop(): Unit
79
}
80
```
81
82
#### Alpha Version Client
83
84
```scala { .api }
85
/**
86
* YARN client implementation for Hadoop 0.23 and 2.0.x
87
* Handles application submission to alpha YARN API
88
*/
89
private[spark] class Client(
90
val args: ClientArguments,
91
val hadoopConf: Configuration,
92
val sparkConf: SparkConf
93
) extends ClientBase with Logging {
94
95
def this(clientArgs: ClientArguments, spConf: SparkConf)
96
def this(clientArgs: ClientArguments)
97
98
val yarnClient: YarnClient
99
val yarnConf: YarnConfiguration
100
101
def stop(): Unit
102
}
103
```
104
105
### Client Base Functionality
106
107
Base trait providing common functionality for YARN client implementations across different Hadoop versions.
108
109
```scala { .api }
110
/**
111
* Base trait for YARN client implementations
112
* Provides common functionality for application submission
113
*/
114
private[spark] trait ClientBase extends Logging {
115
// Implementation details are internal to Spark
116
// Main functionality accessed through Client objects
117
}
118
119
private[spark] object ClientBase extends Logging {
120
// Constants and utility methods for client operations
121
}
122
```
123
124
### Client Arguments
125
126
Argument parsing and configuration management for YARN client operations.
127
128
```scala { .api }
129
/**
130
* Parses and manages command-line arguments for YARN client
131
*/
132
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
133
var addJars: String
134
var files: String
135
var archives: String
136
var userJar: String
137
var userClass: String
138
var userArgs: Array[String]
139
var executorMemory: Int
140
var executorCores: Int
141
var numExecutors: Int
142
var amQueue: String
143
var amMemory: Int
144
var amCores: Int
145
var appName: String
146
}
147
148
object ClientArguments {
149
// Utility methods for argument parsing
150
}
151
```
152
153
**Key Arguments:**
154
155
- `--jar` / `--user-jar`: User application JAR file
156
- `--class` / `--user-class`: Main class to execute
157
- `--arg`: Application arguments (can be specified multiple times)
158
- `--num-executors`: Number of executor instances
159
- `--executor-memory`: Memory per executor
160
- `--executor-cores`: CPU cores per executor
161
- `--queue`: YARN queue for resource allocation
162
- `--name`: Application name
163
164
### Distributed Cache Management
165
166
Manages file distribution and caching for YARN applications, handling JARs, files, and archives.
167
168
```scala { .api }
169
/**
170
* Manages distributed cache for YARN applications
171
* Handles JAR files, additional files, and archives
172
*/
173
private[spark] class ClientDistributedCacheManager() extends Logging {
174
// Internal implementation for managing distributed files
175
// Automatically handles file staging and cache management
176
}
177
```
178
179
## Usage Patterns
180
181
### Command Line Submission
182
183
Most common usage through `spark-submit`:
184
185
```bash
186
# Client mode - driver runs locally
187
spark-submit \
188
--master yarn-client \
189
--deploy-mode client \
190
--num-executors 4 \
191
--executor-memory 2g \
192
--executor-cores 2 \
193
--queue production \
194
--class com.example.MySparkApp \
195
myapp.jar arg1 arg2
196
197
# Cluster mode - driver runs in YARN
198
spark-submit \
199
--master yarn-cluster \
200
--deploy-mode cluster \
201
--num-executors 8 \
202
--executor-memory 4g \
203
--executor-cores 2 \
204
--queue production \
205
--class com.example.MySparkApp \
206
myapp.jar arg1 arg2
207
```
208
209
### Programmatic Configuration
210
211
When integrating YARN submission into applications:
212
213
```scala
214
import org.apache.spark.{SparkConf, SparkContext}
215
import org.apache.spark.deploy.yarn.ClientArguments
216
217
// Configure for YARN client mode
218
val conf = new SparkConf()
219
.setMaster("yarn-client")
220
.setAppName("MySparkApplication")
221
.set("spark.executor.instances", "4")
222
.set("spark.executor.memory", "2g")
223
.set("spark.executor.cores", "2")
224
225
val sc = new SparkContext(conf)
226
```
227
228
### Resource Management
229
230
YARN-specific resource configuration:
231
232
```scala
233
val conf = new SparkConf()
234
.set("spark.yarn.max.executor.failures", "6") // Allow 6 executor failures
235
.set("spark.yarn.am.memory", "1g") // Application Master memory
236
.set("spark.yarn.am.cores", "1") // Application Master CPU cores
237
.set("spark.yarn.queue", "production") // YARN queue name
238
.set("spark.yarn.submit.file.replication", "3") // HDFS replication for app files
239
```
240
241
## Error Handling
242
243
The client handles various failure scenarios:
244
245
- **Invalid arguments**: Validation and error reporting for command-line arguments
246
- **Resource unavailability**: Graceful handling when YARN cannot allocate requested resources
247
- **Authentication failures**: Integration with Kerberos and delegation tokens
248
- **Network connectivity**: Robust handling of ResourceManager communication failures
249
- **File staging errors**: Error recovery for application file distribution
250
251
## Integration Points
252
253
The YARN client integrates with:
254
255
- **Spark Submit**: Primary interface for application submission
256
- **YARN ResourceManager**: For application registration and resource requests
257
- **HDFS**: For application file staging and distribution
258
- **Security**: Kerberos authentication and delegation token management
259
- **Configuration**: Spark and Hadoop configuration management