0
# Scheduler Integration
1
2
The YARN module provides specialized scheduler implementations that integrate Spark's task scheduling with YARN's resource management. These schedulers handle resource allocation, task placement, and coordination between Spark's execution engine and YARN's cluster management.
3
4
## Imports
5
6
```scala
7
import org.apache.spark.SparkContext
8
import org.apache.spark.scheduler.TaskSchedulerImpl
9
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
10
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
11
import org.apache.hadoop.yarn.api.records.ApplicationId
12
```
13
14
## Capabilities
15
16
### YARN Cluster Scheduler
17
18
Scheduler implementation for YARN cluster mode where the driver runs within the YARN cluster as part of the Application Master.
19
20
```scala { .api }
21
/**
22
* Task scheduler for YARN cluster mode
23
* Integrates with Application Master for resource management
24
*/
25
private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
26
27
/**
28
* Gets rack information for a given host
29
* @param hostPort Host and port string (e.g., "host1:8080")
30
* @return Optional rack information for the host
31
*/
32
override def getRackForHost(hostPort: String): Option[String]
33
34
/**
35
* Post-start initialization hook
36
* Notifies Application Master that SparkContext is ready
37
*/
38
override def postStartHook(): Unit
39
40
/**
41
* Stops the scheduler and notifies Application Master
42
*/
43
override def stop(): Unit
44
}
45
```
46
47
**Usage Example:**
48
49
```scala
50
// Automatically created when using yarn-cluster mode
51
val conf = new SparkConf().setMaster("yarn-cluster")
52
val sc = new SparkContext(conf)
53
// YarnClusterScheduler is automatically instantiated
54
```
55
56
### YARN Client Scheduler
57
58
Scheduler implementation for YARN client mode where the driver runs outside the YARN cluster on the client machine.
59
60
```scala { .api }
61
/**
62
* Task scheduler for YARN client mode
63
* Handles scheduling when driver runs outside YARN cluster
64
*/
65
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
66
67
/**
68
* Gets rack information for a given host
69
* @param hostPort Host and port string (e.g., "host1:8080")
70
* @return Optional rack information for the host
71
*/
72
override def getRackForHost(hostPort: String): Option[String]
73
}
74
```
75
76
**Usage Example:**
77
78
```scala
79
// Automatically created when using yarn-client mode
80
val conf = new SparkConf().setMaster("yarn-client")
81
val sc = new SparkContext(conf)
82
// YarnClientClusterScheduler is automatically instantiated
83
```
84
85
### YARN Scheduler Backend Base
86
87
Base class for YARN scheduler backends providing common functionality.
88
89
```scala { .api }
90
/**
91
* Base scheduler backend for YARN implementations
92
* Provides common functionality for cluster and client modes
93
*/
94
private[spark] abstract class YarnSchedulerBackend(
95
scheduler: TaskSchedulerImpl,
96
sc: SparkContext
97
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
98
99
/**
100
* Gets the YARN application ID
101
* @return Application ID string
102
*/
103
def applicationId(): String
104
105
/**
106
* Gets the application attempt ID
107
* @return Application attempt ID
108
*/
109
def applicationAttemptId(): Option[String]
110
}
111
```
112
113
### YARN Cluster Scheduler Backend
114
115
Scheduler backend that manages communication between the Spark scheduler and YARN resources in cluster mode.
116
117
```scala { .api }
118
/**
119
* Scheduler backend for YARN cluster mode
120
* Manages executor lifecycle and communication
121
*/
122
private[spark] class YarnClusterSchedulerBackend(
123
scheduler: TaskSchedulerImpl,
124
sc: SparkContext
125
) extends YarnSchedulerBackend(scheduler, sc) {
126
127
/**
128
* Starts the scheduler backend
129
*/
130
override def start(): Unit
131
132
/**
133
* Gets the YARN application ID from SparkConf
134
* @return Application ID string
135
*/
136
override def applicationId(): String
137
}
138
```
139
140
### YARN Client Scheduler Backend
141
142
Scheduler backend for client mode that coordinates between the external driver and YARN-managed executors.
143
144
```scala { .api }
145
/**
146
* Scheduler backend for YARN client mode
147
* Coordinates between external driver and YARN executors
148
*/
149
private[spark] class YarnClientSchedulerBackend(
150
scheduler: TaskSchedulerImpl,
151
sc: SparkContext
152
) extends YarnSchedulerBackend(scheduler, sc) {
153
154
/**
155
* Starts the scheduler backend and submits application to YARN
156
*/
157
override def start(): Unit
158
159
/**
160
* Stops the scheduler backend and cleans up resources
161
*/
162
override def stop(): Unit
163
164
/**
165
* Gets the YARN application ID
166
* @return Application ID string
167
*/
168
override def applicationId(): String
169
}
170
```
171
172
## Resource Management Integration
173
174
### Rack Awareness
175
176
Both scheduler implementations provide rack awareness for optimal task placement:
177
178
```scala
179
// Rack lookup using YARN's topology information
180
override def getRackForHost(hostPort: String): Option[String] = {
181
val host = Utils.parseHostPort(hostPort)._1
182
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
183
}
184
```
185
186
**Benefits:**
187
- **Data Locality**: Place tasks close to data when possible
188
- **Network Efficiency**: Minimize cross-rack network traffic
189
- **Fault Tolerance**: Distribute tasks across racks for resilience
190
191
### Resource Allocation Coordination
192
193
The schedulers coordinate with YARN for dynamic resource management:
194
195
- **Executor Requests**: Communicate resource needs to Application Master
196
- **Container Allocation**: Handle YARN container assignments
197
- **Dynamic Scaling**: Support for adding/removing executors based on workload
198
- **Resource Constraints**: Respect YARN queue limits and cluster capacity
199
200
## Scheduler Lifecycle
201
202
### Initialization Sequence
203
204
1. **Scheduler Creation**: Automatically instantiated based on master URL
205
2. **Backend Setup**: Create appropriate scheduler backend for the mode
206
3. **Resource Discovery**: Initialize rack topology and cluster information
207
4. **Registration**: Register with Application Master (cluster mode) or start AM (client mode)
208
209
### Cluster Mode Lifecycle
210
211
```scala
212
// 1. Scheduler initialization
213
val scheduler = new YarnClusterScheduler(sparkContext)
214
215
// 2. Post-start hook execution
216
scheduler.postStartHook()
217
// Calls: ApplicationMaster.sparkContextInitialized(sc)
218
219
// 3. Task scheduling and execution
220
// Normal Spark task scheduling operations
221
222
// 4. Shutdown
223
scheduler.stop()
224
// Calls: ApplicationMaster.sparkContextStopped(sc)
225
```
226
227
### Client Mode Lifecycle
228
229
```scala
230
// 1. Scheduler initialization
231
val scheduler = new YarnClientClusterScheduler(sparkContext)
232
233
// 2. Application Master communication setup
234
// Backend establishes communication with separate AM process
235
236
// 3. Task scheduling and execution
237
// Tasks executed on YARN-managed executors
238
239
// 4. Shutdown
240
scheduler.stop()
241
// Cleanup and AM notification
242
```
243
244
## Configuration Integration
245
246
### Scheduler Configuration
247
248
Key configuration properties affecting scheduler behavior:
249
250
```scala
251
val conf = new SparkConf()
252
.set("spark.scheduler.mode", "FAIR") // FAIR or FIFO scheduling
253
.set("spark.scheduler.allocation.file", "pools.xml") // Fair scheduler pools
254
.set("spark.locality.wait", "3s") // Data locality wait time
255
.set("spark.locality.wait.rack", "0") // Rack locality wait time
256
.set("spark.task.maxFailures", "3") // Task failure retry limit
257
```
258
259
### YARN-Specific Configuration
260
261
Configuration properties specific to YARN scheduler integration:
262
263
```scala
264
val conf = new SparkConf()
265
.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000") // Heartbeat interval
266
.set("spark.yarn.scheduler.initial-allocation.interval", "200ms") // Initial allocation
267
.set("spark.yarn.max.executor.failures", "6") // Max executor failures
268
.set("spark.yarn.am.waitTime", "100s") // AM wait time for SparkContext
269
```
270
271
## Task Placement and Locality
272
273
### Data Locality Optimization
274
275
The schedulers optimize task placement for data locality:
276
277
1. **Node Local**: Task runs on same node as data
278
2. **Rack Local**: Task runs on same rack as data
279
3. **Any**: Task can run anywhere in cluster
280
281
```scala
282
// Locality preference handling
283
def getRackForHost(hostPort: String): Option[String] = {
284
val host = Utils.parseHostPort(hostPort)._1
285
// Use YARN's rack resolution
286
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
287
}
288
```
289
290
### Resource Preference
291
292
Task placement considers:
293
294
- **Memory Requirements**: Match tasks to appropriately sized executors
295
- **CPU Requirements**: Consider CPU core availability
296
- **Network Topology**: Minimize data movement across network
297
- **Load Balancing**: Distribute work evenly across cluster
298
299
## Error Handling and Recovery
300
301
### Executor Failure Handling
302
303
The schedulers handle various executor failure scenarios:
304
305
```scala
306
// Executor failure detection and recovery
307
// - Automatic task retry on remaining executors
308
// - Executor replacement through Application Master
309
// - Blacklisting of problematic nodes
310
// - Application failure if too many executors fail
311
```
312
313
### Resource Unavailability
314
315
Handling resource constraints:
316
317
- **Queue Full**: Wait for resources to become available
318
- **Insufficient Memory**: Graceful degradation or failure
319
- **Network Partitions**: Timeout and retry mechanisms
320
- **Node Failures**: Task redistribution to healthy nodes
321
322
### Application Master Communication
323
324
In client mode, robust communication with Application Master:
325
326
- **Heartbeat Monitoring**: Regular health checks
327
- **Message Retry**: Retry failed communications
328
- **Connection Recovery**: Re-establish lost connections
329
- **Failover**: Handle Application Master restarts
330
331
## Integration with Spark Components
332
333
### TaskScheduler Integration
334
335
The YARN schedulers extend Spark's base TaskSchedulerImpl:
336
337
- **Task Submission**: Receive tasks from DAGScheduler
338
- **Resource Matching**: Match tasks to available executors
339
- **Task Launch**: Send tasks to appropriate executors
340
- **Result Collection**: Gather task results and metrics
341
342
### Driver Integration
343
344
Coordination with Spark driver:
345
346
- **Task Graph**: Receive task execution plans
347
- **Status Updates**: Report task and executor status
348
- **Metrics Collection**: Aggregate performance metrics
349
- **Event Handling**: Process Spark events and lifecycle changes
350
351
### Storage Integration
352
353
Integration with Spark's storage layer:
354
355
- **Block Manager**: Coordinate with block managers on executors
356
- **RDD Caching**: Optimize placement for cached RDDs
357
- **Shuffle Management**: Coordinate shuffle operations
358
- **Broadcast Variables**: Efficient distribution of broadcast data
359
360
## Monitoring and Metrics
361
362
### Scheduler Metrics
363
364
Key metrics tracked by YARN schedulers:
365
366
- **Task Completion Rate**: Tasks completed per second
367
- **Resource Utilization**: CPU and memory usage across cluster
368
- **Locality Statistics**: Data locality hit rates
369
- **Failure Rates**: Task and executor failure frequencies
370
371
### Integration with Spark UI
372
373
The schedulers integrate with Spark's web UI:
374
375
- **Executor Information**: Current executor status and resources
376
- **Task Details**: Task execution times and locality
377
- **Resource Usage**: Memory and CPU utilization graphs
378
- **Error Reporting**: Failed task and executor information