0
# Apache Spark YARN Integration
1
2
Apache Spark's YARN integration module enables Spark applications to run on Hadoop YARN (Yet Another Resource Negotiator) clusters alongside other distributed computing frameworks. This module provides an Application Master implementation that manages Spark driver and executor processes within YARN containers, handles resource allocation and deallocation through YARN's ResourceManager, and supports both client and cluster deployment modes.
3
4
## Package Information
5
6
- **Package Name**: yarn-parent_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Group ID**: org.apache.spark
10
- **Version**: 1.2.2
11
- **Installation**: Include in Maven/SBT project dependencies
12
13
## Core Imports
14
15
```scala
16
import org.apache.spark.deploy.yarn._
17
import org.apache.spark.scheduler.cluster._
18
```
19
20
For basic usage:
21
```scala
22
import org.apache.spark.{SparkConf, SparkContext}
23
```
24
25
## Basic Usage
26
27
The YARN integration is typically used by setting the Spark master URL and submitting applications through the YARN client:
28
29
```scala
30
import org.apache.spark.{SparkConf, SparkContext}
31
32
// Set master to YARN mode
33
val sparkConf = new SparkConf()
34
.setMaster("yarn-client") // or "yarn-cluster"
35
.setAppName("MySparkApp")
36
37
val sparkContext = new SparkContext(sparkConf)
38
```
39
40
For command-line submission:
41
42
```bash
43
# Client mode - driver runs on client machine
44
spark-submit --master yarn-client --num-executors 4 myapp.jar
45
46
# Cluster mode - driver runs in YARN container
47
spark-submit --master yarn-cluster --num-executors 4 myapp.jar
48
```
49
50
## Architecture
51
52
Apache Spark YARN integration is built around several key components:
53
54
- **Application Master**: Coordinates with YARN ResourceManager and manages executor allocation
55
- **YARN Client**: Submits applications to YARN and handles deployment preparation
56
- **Scheduler Backends**: Interface between Spark's scheduler and YARN's resource management
57
- **Resource Allocation**: Dynamic executor allocation based on workload demands
58
- **Security Integration**: Kerberos authentication and delegation token management
59
- **Deployment Modes**: Support for both client and cluster deployment patterns
60
61
## Capabilities
62
63
### YARN Client Operations
64
65
Core functionality for submitting Spark applications to YARN clusters, handling resource allocation requests, and managing application lifecycle.
66
67
```scala { .api }
68
// Main entry points for YARN submission
69
object Client {
70
def main(argStrings: Array[String]): Unit
71
}
72
73
object ApplicationMaster {
74
def main(args: Array[String]): Unit
75
}
76
```
77
78
[YARN Client](./client.md)
79
80
### Application Master Management
81
82
Application Master implementation that manages Spark applications within YARN containers, including driver execution and executor coordination.
83
84
```scala { .api }
85
private[spark] class ApplicationMaster(
86
args: ApplicationMasterArguments,
87
client: YarnRMClient
88
) extends Logging {
89
def run(): Int
90
}
91
92
object ApplicationMaster {
93
private[spark] def sparkContextInitialized(sc: SparkContext): Unit
94
private[spark] def sparkContextStopped(sc: SparkContext): Unit
95
}
96
```
97
98
[Application Master](./application-master.md)
99
100
### Scheduler Integration
101
102
Scheduler implementations that integrate Spark's task scheduling with YARN's resource management, supporting both client and cluster deployment modes.
103
104
```scala { .api }
105
private[spark] abstract class YarnSchedulerBackend(
106
scheduler: TaskSchedulerImpl,
107
sc: SparkContext
108
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
109
110
private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
111
override def getRackForHost(hostPort: String): Option[String]
112
override def postStartHook(): Unit
113
override def stop(): Unit
114
}
115
116
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)
117
118
private[spark] class YarnClientSchedulerBackend(
119
scheduler: TaskSchedulerImpl,
120
sc: SparkContext
121
) extends YarnSchedulerBackend(scheduler, sc) {
122
override def start(): Unit
123
override def stop(): Unit
124
override def applicationId(): String
125
}
126
127
private[spark] class YarnClusterSchedulerBackend(
128
scheduler: TaskSchedulerImpl,
129
sc: SparkContext
130
) extends YarnSchedulerBackend(scheduler, sc) {
131
override def start(): Unit
132
override def applicationId(): String
133
}
134
```
135
136
[Scheduler Integration](./schedulers.md)
137
138
### Hadoop Utilities
139
140
YARN-specific utilities for Hadoop integration, security, and environment management that extend Spark's base Hadoop utilities.
141
142
```scala { .api }
143
class YarnSparkHadoopUtil extends SparkHadoopUtil {
144
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit
145
override def isYarnMode(): Boolean
146
override def newConfiguration(conf: SparkConf): Configuration
147
override def addCredentials(conf: JobConf): Unit
148
override def getCurrentUserCredentials(): Credentials
149
override def addCurrentUserCredentials(creds: Credentials): Unit
150
override def addSecretKeyToUserCredentials(key: String, secret: String): Unit
151
override def getSecretKeyFromUserCredentials(key: String): Array[Byte]
152
}
153
154
object YarnSparkHadoopUtil {
155
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
156
def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit
157
def escapeForShell(arg: String): String
158
def lookupRack(conf: Configuration, host: String): String
159
def populateRackInfo(conf: Configuration, hostname: String): Unit
160
def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]
161
}
162
```
163
164
[Hadoop Utilities](./hadoop-utils.md)
165
166
## Types
167
168
```scala { .api }
169
// Core YARN types
170
import org.apache.hadoop.conf.Configuration
171
import org.apache.hadoop.mapred.JobConf
172
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
173
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ApplicationAttemptId, ApplicationId, LocalResource}
174
import org.apache.hadoop.yarn.client.api.YarnClient
175
import org.apache.hadoop.yarn.conf.YarnConfiguration
176
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
177
import org.apache.spark.scheduler.TaskSchedulerImpl
178
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
179
import org.apache.spark.rpc.RpcEndpointRef
180
import org.apache.spark.util.Utils
181
182
// YARN-specific classes
183
private[spark] class ApplicationMasterArguments(args: Array[String]) {
184
var userClass: String = null
185
var userJar: String = null
186
var userArgs: Array[String] = Array.empty
187
var numExecutors: Int = 2
188
var executorMemory: Int = 1024
189
var executorCores: Int = 1
190
var amMemory: Int = 512
191
var amCores: Int = 1
192
}
193
194
private[spark] trait YarnRMClient {
195
def getAttemptId(): ApplicationAttemptId
196
def getMaxRegAttempts(conf: YarnConfiguration, sparkConf: SparkConf): Int
197
def register(
198
driverUrl: String,
199
driverRef: RpcEndpointRef,
200
conf: YarnConfiguration,
201
sparkConf: SparkConf,
202
uiAddress: String,
203
uiHistoryAddress: String,
204
securityMgr: SecurityManager,
205
localResources: Map[String, LocalResource]
206
): YarnAllocator
207
def shutdown(): Unit
208
}
209
210
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
211
var addJars: String = null
212
var files: String = null
213
var archives: String = null
214
var userJar: String = null
215
var userClass: String = null
216
var userArgs: Array[String] = Array.empty
217
var executorMemory: Int = 1024
218
var executorCores: Int = 1
219
var numExecutors: Int = 2
220
var amQueue: String = "default"
221
var amMemory: Int = 512
222
var amCores: Int = 1
223
var appName: String = "Spark"
224
val amMemoryOverhead: Int = 384
225
val executorMemoryOverhead: Int = 384
226
}
227
228
// Collection types for utilities
229
import scala.collection.mutable.HashMap
230
import scala.collection.Map
231
```
232
233
## Configuration
234
235
Key configuration properties for YARN integration:
236
237
- `spark.yarn.max.executor.failures` - Maximum number of executor failures before failing the application
238
- `spark.yarn.max.worker.failures` - (Deprecated) Same as above for backward compatibility
239
- `spark.yarn.app.id` - Application ID set by the Application Master
240
- `spark.ui.port` - Set to "0" for ephemeral port allocation in YARN mode
241
242
## Deployment Modes
243
244
### YARN Client Mode (`yarn-client`)
245
- Driver runs on the client machine outside YARN cluster
246
- Direct communication between driver and executors
247
- Interactive applications and development use cases
248
249
### YARN Cluster Mode (`yarn-cluster`)
250
- Driver runs inside YARN cluster as part of Application Master
251
- Better for production batch jobs
252
- Automatic cleanup and resource management
253
254
## Version Support
255
256
The YARN module supports multiple Hadoop versions through separate implementations:
257
258
- **Alpha**: Hadoop 0.23 and 2.0.x (deprecated in Spark 1.3+)
259
- **Stable**: Hadoop 2.2+ (recommended)
260
261
Both implementations provide the same API surface but use different versions of the underlying Hadoop YARN API.