0
# Scheduler Integration
1
2
Integration components that connect Spark's task scheduling system with YARN's resource management. These classes provide cluster manager implementation and scheduler backends for both client and cluster deployment modes.
3
4
## Capabilities
5
6
### YarnClusterManager
7
8
Cluster manager implementation that integrates Spark with YARN. Registered as an external cluster manager for "yarn" master URLs.
9
10
```scala { .api }
11
/**
12
* Cluster manager implementation for YARN integration
13
* Implements ExternalClusterManager interface
14
*/
15
private[spark] class YarnClusterManager extends ExternalClusterManager {
16
/**
17
* Check if this manager can create components for the given master URL
18
* @param masterURL Master URL (should be "yarn")
19
* @return true if masterURL is "yarn"
20
*/
21
def canCreate(masterURL: String): Boolean
22
23
/**
24
* Create YARN task scheduler
25
* @param sc SparkContext
26
* @param masterURL Master URL
27
* @return TaskScheduler instance
28
*/
29
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
30
31
/**
32
* Create YARN scheduler backend
33
* @param sc SparkContext
34
* @param masterURL Master URL
35
* @param scheduler TaskScheduler instance
36
* @return SchedulerBackend instance
37
*/
38
def createSchedulerBackend(
39
sc: SparkContext,
40
masterURL: String,
41
scheduler: TaskScheduler
42
): SchedulerBackend
43
44
/**
45
* Initialize scheduler components
46
* @param scheduler TaskScheduler to initialize
47
* @param backend SchedulerBackend to initialize
48
*/
49
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
50
}
51
```
52
53
**Usage Example:**
54
55
```scala
56
import org.apache.spark.{SparkConf, SparkContext}
57
58
// YarnClusterManager is automatically used when master is "yarn"
59
val conf = new SparkConf()
60
.setMaster("yarn")
61
.setAppName("YarnIntegrationExample")
62
63
val sc = new SparkContext(conf)
64
// YarnClusterManager creates appropriate scheduler and backend automatically
65
```
66
67
### YarnSchedulerBackend (Abstract)
68
69
Base class for YARN scheduler backends that handle resource requests and executor management.
70
71
```scala { .api }
72
/**
73
* Abstract base class for YARN scheduler backends
74
* @param scheduler TaskScheduler implementation
75
* @param sc SparkContext
76
*/
77
private[spark] abstract class YarnSchedulerBackend(
78
scheduler: TaskSchedulerImpl,
79
sc: SparkContext
80
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
81
82
/**
83
* Bind scheduler backend to YARN application
84
* @param appId YARN application ID
85
* @param attemptId Optional application attempt ID
86
*/
87
protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit
88
89
/** Get YARN application ID as string */
90
def applicationId(): String
91
92
/** Get application attempt ID as string */
93
def applicationAttemptId(): Option[String]
94
95
/** Request total executors from YARN */
96
def doRequestTotalExecutors(): Future[Boolean]
97
98
/**
99
* Kill specific executors
100
* @param executorIds Sequence of executor IDs to kill
101
* @return Future indicating success/failure
102
*/
103
def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
104
105
/** Get shuffle service merger locations */
106
def getShufflePushMergerLocations(): Seq[BlockManagerId]
107
}
108
```
109
110
### YarnClientSchedulerBackend
111
112
Scheduler backend for yarn-client mode where driver runs locally and executors run on YARN.
113
114
```scala { .api }
115
/**
116
* Scheduler backend for yarn-client mode
117
* @param scheduler TaskScheduler implementation
118
* @param sc SparkContext
119
*/
120
private[spark] class YarnClientSchedulerBackend(
121
scheduler: TaskSchedulerImpl,
122
sc: SparkContext
123
) extends YarnSchedulerBackend(scheduler, sc) {
124
125
/** Start client mode scheduler backend */
126
def start(): Unit
127
128
/**
129
* Stop scheduler backend
130
* @param exitCode Exit code for cleanup
131
*/
132
def stop(exitCode: Int): Unit
133
}
134
```
135
136
**Usage Example:**
137
138
```scala
139
import org.apache.spark.{SparkConf, SparkContext}
140
141
// Client mode - driver runs locally, executors on YARN
142
val conf = new SparkConf()
143
.setMaster("yarn")
144
.setDeployMode("client") // or spark.submit.deployMode=client
145
.setAppName("ClientModeApp")
146
147
val sc = new SparkContext(conf) // Uses YarnClientSchedulerBackend automatically
148
```
149
150
### YarnClusterSchedulerBackend
151
152
Scheduler backend for yarn-cluster mode where both driver and executors run on YARN.
153
154
```scala { .api }
155
/**
156
* Scheduler backend for yarn-cluster mode
157
* @param scheduler TaskScheduler implementation
158
* @param sc SparkContext
159
*/
160
private[spark] class YarnClusterSchedulerBackend(
161
scheduler: TaskSchedulerImpl,
162
sc: SparkContext
163
) extends YarnSchedulerBackend(scheduler, sc) {
164
165
/** Start cluster mode scheduler backend */
166
def start(): Unit
167
168
/**
169
* Stop scheduler backend
170
* @param exitCode Exit code for cleanup
171
*/
172
def stop(exitCode: Int): Unit
173
174
/** Get driver container log URLs */
175
def getDriverLogUrls(): Option[Map[String, String]]
176
177
/** Get driver container attributes */
178
def getDriverAttributes(): Option[Map[String, String]]
179
}
180
```
181
182
**Usage Example:**
183
184
```scala
185
import org.apache.spark.{SparkConf, SparkContext}
186
187
// Cluster mode - both driver and executors on YARN
188
val conf = new SparkConf()
189
.setMaster("yarn")
190
.setDeployMode("cluster") // or spark.submit.deployMode=cluster
191
.setAppName("ClusterModeApp")
192
193
val sc = new SparkContext(conf) // Uses YarnClusterSchedulerBackend automatically
194
```
195
196
### YarnScheduler
197
198
Task scheduler with YARN-specific enhancements for rack awareness and locality optimization.
199
200
```scala { .api }
201
/**
202
* Task scheduler with YARN-specific enhancements
203
* @param sc SparkContext
204
*/
205
class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl {
206
207
/**
208
* Get rack information for hosts
209
* @param hostPorts Sequence of host:port strings
210
* @return Sequence of optional rack names
211
*/
212
def getRacksForHosts(hostPorts: Seq[String]): Seq[Option[String]]
213
}
214
```
215
216
### YarnClusterScheduler
217
218
Task scheduler specifically for yarn-cluster mode with additional initialization hooks.
219
220
```scala { .api }
221
/**
222
* Task scheduler for yarn-cluster mode
223
* @param sc SparkContext
224
*/
225
class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler {
226
227
/** Post-start initialization hook */
228
def postStartHook(): Unit
229
}
230
```
231
232
## Integration Patterns
233
234
### Automatic Mode Selection
235
236
The YARN integration automatically selects the appropriate scheduler backend based on deployment mode:
237
238
```scala
239
import org.apache.spark.{SparkConf, SparkContext}
240
241
// Client mode (default)
242
val clientConf = new SparkConf()
243
.setMaster("yarn")
244
.setAppName("AutoClientMode")
245
// Creates: YarnScheduler + YarnClientSchedulerBackend
246
247
// Cluster mode
248
val clusterConf = new SparkConf()
249
.setMaster("yarn")
250
.setDeployMode("cluster")
251
.setAppName("AutoClusterMode")
252
// Creates: YarnClusterScheduler + YarnClusterSchedulerBackend
253
```
254
255
### Custom Scheduler Configuration
256
257
```scala
258
import org.apache.spark.SparkConf
259
260
val conf = new SparkConf()
261
.setMaster("yarn")
262
.setAppName("CustomSchedulerConfig")
263
// Scheduler-specific configurations
264
.set("spark.scheduler.mode", "FAIR")
265
.set("spark.scheduler.allocation.file", "/path/to/fairscheduler.xml")
266
.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
267
.set("spark.scheduler.minRegisteredResourcesRatio", "0.8")
268
```
269
270
### Resource Request Patterns
271
272
```scala
273
// Configure executor resources
274
val conf = new SparkConf()
275
.setMaster("yarn")
276
.set("spark.executor.instances", "10")
277
.set("spark.executor.cores", "2")
278
.set("spark.executor.memory", "4g")
279
.set("spark.executor.memoryOverhead", "512m")
280
// Dynamic allocation
281
.set("spark.dynamicAllocation.enabled", "true")
282
.set("spark.dynamicAllocation.minExecutors", "2")
283
.set("spark.dynamicAllocation.maxExecutors", "20")
284
.set("spark.dynamicAllocation.initialExecutors", "5")
285
```
286
287
### Locality and Rack Awareness
288
289
```scala
290
import org.apache.spark.scheduler.cluster.YarnScheduler
291
292
// YarnScheduler automatically provides rack awareness
293
val scheduler = new YarnScheduler(sparkContext)
294
295
// Get rack information for data locality optimization
296
val hosts = Seq("worker1:7337", "worker2:7337", "worker3:7337")
297
val racks = scheduler.getRacksForHosts(hosts)
298
// Returns rack information if available from YARN
299
300
// Configure locality preferences
301
val conf = new SparkConf()
302
.set("spark.locality.wait", "3s")
303
.set("spark.locality.wait.node", "0")
304
.set("spark.locality.wait.rack", "0")
305
```
306
307
### Scheduler Backend Lifecycle
308
309
```scala
310
// The lifecycle is typically managed automatically, but understanding the flow:
311
312
// 1. YarnClusterManager.createSchedulerBackend() creates appropriate backend
313
// 2. Backend.start() initializes communication with YARN
314
// 3. Backend.bindToYarn() connects to YARN application
315
// 4. Backend handles executor requests via doRequestTotalExecutors()
316
// 5. Backend.stop() cleans up resources
317
318
// For custom integration:
319
class CustomYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
320
extends YarnSchedulerBackend(scheduler, sc) {
321
322
override def start(): Unit = {
323
super.start()
324
// Custom initialization logic
325
}
326
327
override def doRequestTotalExecutors(): Future[Boolean] = {
328
// Custom executor request logic
329
super.doRequestTotalExecutors()
330
}
331
}
332
```
333
334
## Error Handling and Monitoring
335
336
### Backend Status Monitoring
337
338
```scala
339
// Monitor scheduler backend status
340
val backend = sparkContext.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
341
342
// Get YARN application information
343
val appId = backend.applicationId()
344
val attemptId = backend.applicationAttemptId()
345
346
println(s"Application ID: $appId")
347
println(s"Attempt ID: $attemptId")
348
349
// For cluster mode, get driver information
350
backend match {
351
case clusterBackend: YarnClusterSchedulerBackend =>
352
val driverLogs = clusterBackend.getDriverLogUrls()
353
val driverAttrs = clusterBackend.getDriverAttributes()
354
println(s"Driver logs: $driverLogs")
355
println(s"Driver attributes: $driverAttrs")
356
case _ => // Client mode
357
}
358
```
359
360
### Executor Management
361
362
```scala
363
import scala.concurrent.Future
364
import scala.concurrent.ExecutionContext.Implicits.global
365
366
// Request additional executors
367
val backend = sparkContext.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
368
val requestFuture: Future[Boolean] = backend.doRequestTotalExecutors()
369
370
requestFuture.onComplete {
371
case Success(true) => println("Executor request successful")
372
case Success(false) => println("Executor request failed")
373
case Failure(exception) => println(s"Request failed with exception: $exception")
374
}
375
376
// Kill specific executors
377
val executorsToKill = Seq("executor-1", "executor-2")
378
val killFuture: Future[Boolean] = backend.doKillExecutors(executorsToKill)
379
```