0
# Scheduler Backends
1
2
Scheduler backend implementations for integrating Spark's TaskScheduler with YARN resource management. These components provide the bridge between Spark's task scheduling system and YARN's resource allocation, supporting both client and cluster deployment modes.
3
4
## Capabilities
5
6
### YarnClientSchedulerBackend
7
8
Scheduler backend for YARN client mode, where the Spark driver runs on the client machine and the ApplicationMaster manages only executors.
9
10
```scala { .api }
11
/**
12
* Scheduler backend for YARN client mode
13
* Manages executor lifecycle and resource requests through YARN ApplicationMaster
14
*/
15
private[spark] class YarnClientSchedulerBackend(
16
scheduler: TaskSchedulerImpl,
17
sc: SparkContext
18
) extends YarnSchedulerBackend {
19
20
/**
21
* Start the backend and submit application to YARN
22
* Initiates ApplicationMaster and begins executor allocation
23
*/
24
def start(): Unit
25
26
// Application monitoring and lifecycle management
27
// Resource request handling through ApplicationMaster
28
// Executor status tracking and failure handling
29
}
30
```
31
32
**Usage Examples:**
33
34
```scala
35
import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
36
import org.apache.spark.scheduler.TaskSchedulerImpl
37
import org.apache.spark.SparkContext
38
39
// Typically created by Spark runtime in client mode
40
val sc = new SparkContext(sparkConf)
41
val taskScheduler = new TaskSchedulerImpl(sc)
42
val backend = new YarnClientSchedulerBackend(taskScheduler, sc)
43
44
// Backend lifecycle managed by Spark runtime
45
backend.start()
46
```
47
48
### YarnClusterSchedulerBackend
49
50
Scheduler backend for YARN cluster mode, where both the driver and executors run on the YARN cluster.
51
52
```scala { .api }
53
/**
54
* Scheduler backend for YARN cluster mode
55
* Manages executors when driver runs within ApplicationMaster
56
*/
57
private[spark] class YarnClusterSchedulerBackend extends YarnSchedulerBackend {
58
// Cluster-specific resource management
59
// Direct integration with ApplicationMaster
60
// Optimized executor allocation for cluster deployment
61
}
62
```
63
64
**Usage Examples:**
65
66
```scala
67
import org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
68
69
// Created by Spark runtime in cluster mode
70
// Driver runs inside ApplicationMaster on YARN cluster
71
val backend = new YarnClusterSchedulerBackend()
72
```
73
74
### YarnSchedulerBackend (Base Class)
75
76
Abstract base class providing common YARN scheduler backend functionality.
77
78
```scala { .api }
79
/**
80
* Base class for YARN scheduler backend implementations
81
* Provides common functionality for both client and cluster modes
82
*/
83
private[spark] abstract class YarnSchedulerBackend
84
extends CoarseGrainedSchedulerBackend {
85
86
// Common YARN resource management operations
87
// Executor container lifecycle management
88
// Resource request and allocation handling
89
// Integration with YARN ApplicationMaster
90
}
91
```
92
93
### Task Schedulers
94
95
Task scheduler implementations optimized for YARN deployment modes.
96
97
```scala { .api }
98
/**
99
* Task scheduler for YARN client mode
100
* Optimized for client-side driver with remote executors
101
*/
102
private[spark] class YarnClientClusterScheduler(sc: SparkContext)
103
extends TaskSchedulerImpl {
104
105
// Client-specific scheduling optimizations
106
// Remote executor communication handling
107
// Task distribution strategies for client mode
108
}
109
110
/**
111
* Task scheduler for YARN cluster mode
112
* Optimized for driver and executors running on same cluster
113
*/
114
private[spark] class YarnClusterScheduler(sc: SparkContext)
115
extends TaskSchedulerImpl {
116
117
// Cluster-specific scheduling optimizations
118
// Locality-aware task placement
119
// Efficient intra-cluster communication
120
}
121
```
122
123
**Usage Examples:**
124
125
```scala
126
import org.apache.spark.scheduler.cluster.{YarnClientClusterScheduler, YarnClusterScheduler}
127
import org.apache.spark.SparkContext
128
129
// Client mode scheduler
130
val sc = new SparkContext(sparkConf)
131
val clientScheduler = new YarnClientClusterScheduler(sc)
132
133
// Cluster mode scheduler
134
val clusterScheduler = new YarnClusterScheduler(sc)
135
```
136
137
## Scheduler Backend Architecture
138
139
### Client Mode Architecture
140
141
```scala
142
// Client Mode Flow:
143
// SparkContext (Client Machine)
144
// ↓
145
// YarnClientSchedulerBackend
146
// ↓
147
// YarnClientClusterScheduler
148
// ↓
149
// ApplicationMaster (YARN Cluster) - ExecutorLauncher mode
150
// ↓
151
// Executor Containers (YARN Nodes)
152
```
153
154
In client mode:
155
1. Driver runs on client machine
156
2. `YarnClientSchedulerBackend` submits ApplicationMaster to YARN
157
3. ApplicationMaster runs in ExecutorLauncher mode (manages only executors)
158
4. Tasks are scheduled from client to remote executors
159
160
### Cluster Mode Architecture
161
162
```scala
163
// Cluster Mode Flow:
164
// ApplicationMaster (YARN Cluster) - Driver mode
165
// ↓
166
// YarnClusterSchedulerBackend
167
// ↓
168
// YarnClusterScheduler
169
// ↓
170
// Executor Containers (YARN Nodes)
171
```
172
173
In cluster mode:
174
1. Driver runs inside ApplicationMaster on YARN cluster
175
2. `YarnClusterSchedulerBackend` manages local executor allocation
176
3. Optimized for locality and reduced network overhead
177
178
## Resource Management Integration
179
180
### Executor Allocation
181
182
```scala
183
// Scheduler backends integrate with YARN resource management
184
abstract class YarnSchedulerBackend {
185
// Request executor containers from YARN
186
protected def requestExecutors(numExecutors: Int): Unit
187
188
// Handle executor container allocation from ResourceManager
189
protected def onExecutorsAdded(executorIds: Seq[String]): Unit
190
191
// Handle executor failures and cleanup
192
protected def onExecutorRemoved(executorId: String, reason: String): Unit
193
}
194
```
195
196
### Dynamic Allocation
197
198
Integration with Spark's dynamic allocation feature:
199
200
```scala
201
// Configuration for dynamic executor allocation
202
spark.dynamicAllocation.enabled=true
203
spark.dynamicAllocation.minExecutors=1
204
spark.dynamicAllocation.maxExecutors=10
205
spark.dynamicAllocation.initialExecutors=2
206
207
// YarnSchedulerBackend handles dynamic scaling
208
// Requests/releases executors based on workload
209
```
210
211
## Configuration Options
212
213
### Client Mode Configuration
214
215
```scala
216
// Key configuration properties for client mode
217
spark.yarn.am.memory=512m // ApplicationMaster memory
218
spark.yarn.am.cores=1 // ApplicationMaster cores
219
spark.yarn.am.waitTime=100s // Max wait for SparkContext
220
spark.yarn.client.executor.graceTime=5s // Executor shutdown grace period
221
```
222
223
### Cluster Mode Configuration
224
225
```scala
226
// Key configuration properties for cluster mode
227
spark.driver.memory=1g // Driver memory (runs in AM)
228
spark.driver.cores=1 // Driver cores
229
spark.yarn.driver.memoryFraction=0.1 // Driver memory fraction
230
spark.yarn.am.extraJavaOptions // JVM options for AM/driver
231
```
232
233
### Common Configuration
234
235
```scala
236
// Configuration affecting both modes
237
spark.executor.memory=1g // Executor memory
238
spark.executor.cores=1 // Executor cores
239
spark.executor.instances=2 // Number of executors
240
spark.yarn.queue=default // YARN queue
241
spark.yarn.priority=0 // Application priority
242
```
243
244
## Fault Tolerance
245
246
### Executor Failure Handling
247
248
```scala
249
// Scheduler backends provide fault tolerance mechanisms
250
class YarnSchedulerBackend {
251
// Detect executor failures through heartbeat monitoring
252
// Automatically request replacement executors from YARN
253
// Reschedule failed tasks on healthy executors
254
// Blacklist problematic nodes after repeated failures
255
}
256
```
257
258
### ApplicationMaster Failure Recovery
259
260
- **Client Mode**: ApplicationMaster failure requires resubmission
261
- **Cluster Mode**: Driver failure terminates application (no recovery)
262
- Both modes support checkpoint-based recovery for stateful applications
263
264
## Performance Optimization
265
266
### Locality Optimization
267
268
```scala
269
// Scheduler backends optimize for data locality
270
class YarnClusterScheduler {
271
// Prefer scheduling tasks on nodes with cached data
272
// Consider HDFS block locations for task placement
273
// Balance between locality and resource availability
274
}
275
```
276
277
### Resource Utilization
278
279
```scala
280
// Efficient resource utilization strategies
281
class YarnSchedulerBackend {
282
// Container sharing between multiple executors (when supported)
283
// Optimal container size calculation based on workload
284
// Preemption handling for shared cluster environments
285
}
286
```
287
288
## Monitoring and Metrics
289
290
The scheduler backends integrate with Spark's metrics system:
291
292
- Executor allocation/deallocation events
293
- Task scheduling latency metrics
294
- Resource utilization tracking
295
- YARN application progress reporting
296
- Integration with Spark History Server