Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters
npx @tessl/cli install tessl/maven-org-apache-spark--spark-yarn_2-11@2.4.00
# Apache Spark YARN
1
2
Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters. This module provides cluster managers, schedulers, and backends specifically designed for YARN environments, enabling seamless integration between Spark's distributed computing capabilities and YARN's resource management.
3
4
## Package Information
5
6
- **Package Name**: spark-yarn_2.11
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-yarn_2.11</artifactId>
14
<version>2.4.8</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAllocator}
22
import org.apache.spark.scheduler.cluster.{YarnClusterManager, SchedulerExtensionService, SchedulerExtensionServiceBinding}
23
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
24
import org.apache.hadoop.conf.Configuration
25
import org.apache.hadoop.yarn.conf.YarnConfiguration
26
import org.apache.hadoop.security.Credentials
27
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationAttemptId, Container, ContainerRequest}
28
import org.apache.hadoop.yarn.client.api.AMRMClient
29
import org.apache.spark.util.Clock
30
```
31
32
**For Java shuffle service integration:**
33
```java
34
import org.apache.spark.network.yarn.YarnShuffleService;
35
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
36
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
37
```
38
39
## Basic Usage
40
41
### Client Mode Deployment
42
43
```scala
44
import org.apache.spark.{SparkConf, SparkContext}
45
46
// Configure Spark for YARN client mode
47
val conf = new SparkConf()
48
.setAppName("MyApp")
49
.setMaster("yarn")
50
.set("spark.submit.deployMode", "client")
51
.set("spark.yarn.queue", "default")
52
53
val sc = new SparkContext(conf)
54
// Use SparkContext normally
55
sc.stop()
56
```
57
58
### Cluster Mode Deployment
59
60
```scala
61
import org.apache.spark.{SparkConf, SparkContext}
62
63
// Configure Spark for YARN cluster mode
64
val conf = new SparkConf()
65
.setAppName("MyApp")
66
.setMaster("yarn")
67
.set("spark.submit.deployMode", "cluster")
68
.set("spark.yarn.queue", "production")
69
70
val sc = new SparkContext(conf)
71
// Use SparkContext normally
72
sc.stop()
73
```
74
75
### Programmatic Application Submission
76
77
```scala
78
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
79
import org.apache.spark.SparkConf
80
81
val sparkConf = new SparkConf()
82
.setAppName("MyYarnApp")
83
.set("spark.yarn.queue", "default")
84
85
val args = Array(
86
"--jar", "/path/to/my-app.jar",
87
"--class", "com.example.MyMainClass"
88
)
89
90
val clientArgs = new ClientArguments(args)
91
val client = new Client(clientArgs, sparkConf)
92
93
val applicationId = client.submitApplication()
94
println(s"Application submitted with ID: $applicationId")
95
```
96
97
## Architecture
98
99
The YARN integration follows Spark's pluggable cluster manager architecture:
100
101
- **External Cluster Manager**: `YarnClusterManager` registered as service provider
102
- **Scheduler Backends**: Separate implementations for client (`YarnClientSchedulerBackend`) and cluster (`YarnClusterSchedulerBackend`) modes
103
- **Application Master**: Handles both cluster mode driver execution and client mode coordination
104
- **Resource Management**: `YarnAllocator` manages container allocation and executor lifecycle
105
- **Security Integration**: Credential providers and delegation token management for secure clusters
106
107
## Capabilities
108
109
### Cluster Management
110
111
Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI.
112
113
```scala { .api }
114
class YarnClusterManager extends ExternalClusterManager {
115
def canCreate(masterURL: String): Boolean
116
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
117
def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
118
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
119
}
120
```
121
122
[Cluster Management](./cluster-management.md)
123
124
### Application Deployment
125
126
Client API for submitting and managing YARN applications programmatically, supporting both client and cluster deployment modes.
127
128
```scala { .api }
129
class Client(args: ClientArguments, sparkConf: SparkConf) {
130
def submitApplication(): ApplicationId
131
def run(): Unit
132
def stop(): Unit
133
def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport
134
def getApplicationReport(appId: ApplicationId): ApplicationReport
135
}
136
```
137
138
[Application Deployment](./application-deployment.md)
139
140
### Resource Management
141
142
Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies.
143
144
```scala { .api }
145
class YarnAllocator(
146
driverUrl: String,
147
driverRef: RpcEndpointRef,
148
conf: YarnConfiguration,
149
sparkConf: SparkConf,
150
amClient: AMRMClient[ContainerRequest],
151
appAttemptId: ApplicationAttemptId,
152
securityMgr: SecurityManager,
153
localResources: Map[String, LocalResource],
154
resolver: SparkRackResolver,
155
clock: Clock = new SystemClock()
156
) {
157
def getNumExecutorsRunning: Int
158
def getNumExecutorsFailed: Int
159
def numContainersPendingAllocate: Int
160
def allocateResources(): Unit
161
def killExecutor(executorId: String): Unit
162
def stop(): Unit
163
}
164
```
165
166
[Resource Management](./resource-management.md)
167
168
### YARN Shuffle Service
169
170
External shuffle service that runs on YARN NodeManagers to provide shuffle data management for Spark applications, improving executor stability and enabling dynamic allocation.
171
172
```java { .api }
173
public class YarnShuffleService extends AuxiliaryService {
174
protected void serviceInit(Configuration conf) throws Exception;
175
protected void serviceStart() throws Exception;
176
protected void serviceStop() throws Exception;
177
public void initializeApplication(ApplicationInitializationContext context) throws Exception;
178
public void stopApplication(ApplicationTerminationContext context) throws Exception;
179
}
180
```
181
182
[YARN Shuffle Service](./yarn-shuffle-service.md)
183
184
### Security Integration
185
186
Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication.
187
188
```scala { .api }
189
trait ServiceCredentialProvider {
190
def serviceName: String
191
def credentialsRequired(hadoopConf: Configuration): Boolean
192
def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]
193
}
194
```
195
196
[Security Integration](./security-integration.md)
197
198
### Extension Points
199
200
Pluggable extension system for custom scheduler services and functionality in YARN deployments.
201
202
```scala { .api }
203
trait SchedulerExtensionService {
204
def start(binding: SchedulerExtensionServiceBinding): Unit
205
def stop(): Unit
206
}
207
```
208
209
[Extension Points](./extension-points.md)
210
211
### Configuration System
212
213
YARN-specific configuration options for controlling resource allocation, security, and deployment behavior.
214
215
```scala { .api }
216
// Key configuration entries
217
val APPLICATION_TAGS: ConfigEntry[Seq[String]]
218
val MAX_APP_ATTEMPTS: ConfigEntry[Int]
219
val QUEUE_NAME: ConfigEntry[String]
220
val SPARK_ARCHIVE: OptionalConfigEntry[String]
221
val USER_CLASS_PATH_FIRST: ConfigEntry[Boolean]
222
```
223
224
[Configuration System](./configuration-system.md)
225
226
### Command Building Utilities
227
228
YARN-specific utilities for building container launch commands and managing Spark distribution.
229
230
```scala { .api }
231
object YarnSparkHadoopUtil {
232
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
233
val MEMORY_OVERHEAD_FACTOR: Double
234
val MEMORY_OVERHEAD_MIN: Long
235
val RM_REQUEST_PRIORITY: Priority
236
}
237
238
object YarnCommandBuilderUtils {
239
def quoteForBatchScript(arg: String): String
240
def findJarsDir(sparkHome: String): String
241
}
242
```
243
244
[Command Building Utilities](./command-building.md)
245
246
## Common Integration Patterns
247
248
### Custom Credential Provider
249
250
```scala
251
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
252
253
class MyCredentialProvider extends ServiceCredentialProvider {
254
override def serviceName: String = "my-service"
255
256
override def credentialsRequired(hadoopConf: Configuration): Boolean = {
257
// Check if credentials are needed
258
hadoopConf.get("my.service.enabled", "false").toBoolean
259
}
260
261
override def obtainCredentials(
262
hadoopConf: Configuration,
263
sparkConf: SparkConf,
264
creds: Credentials): Option[Long] = {
265
// Obtain and add credentials
266
// Return renewal time in milliseconds, or None if no renewal needed
267
None
268
}
269
}
270
```
271
272
### Custom Scheduler Extension
273
274
```scala
275
import org.apache.spark.scheduler.cluster.SchedulerExtensionService
276
277
class MySchedulerExtension extends SchedulerExtensionService {
278
override def start(binding: SchedulerExtensionServiceBinding): Unit = {
279
// Initialize extension with access to scheduler components
280
}
281
282
override def stop(): Unit = {
283
// Cleanup extension resources
284
}
285
}
286
```
287
288
## Error Handling
289
290
Common exceptions thrown by YARN integration:
291
292
- `SparkException`: Thrown for unsupported deploy modes or configuration errors
293
- `IOException`: File system operations during staging and cleanup
294
- `YarnException`: YARN-specific errors during application submission or management
295
- `SecurityException`: Credential or authentication failures in secure clusters
296
297
## Integration Requirements
298
299
- **Hadoop/YARN**: Compatible Hadoop YARN cluster (2.6+)
300
- **Scala Version**: Built for Scala 2.11 binary compatibility
301
- **Spark Core**: Requires matching spark-core_2.11 dependency
302
- **Security**: Optional Kerberos configuration for secure clusters