0
# Cluster Management
1
2
Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI. This module handles the lifecycle of YARN-based Spark applications and provides appropriate schedulers and backends for different deployment modes.
3
4
## Capabilities
5
6
### YarnClusterManager
7
8
Main entry point for YARN cluster management, registered as an `ExternalClusterManager` service provider. Automatically activated when `master = "yarn"` is specified in SparkConf.
9
10
```scala { .api }
11
class YarnClusterManager extends ExternalClusterManager {
12
def canCreate(masterURL: String): Boolean
13
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
14
def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
15
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
16
}
17
```
18
19
**Parameters:**
20
- `masterURL`: Must be "yarn" for this cluster manager to be selected
21
- `sc`: SparkContext instance for the application
22
- `scheduler`: TaskScheduler instance to be initialized
23
- `backend`: SchedulerBackend instance to be initialized
24
25
**Usage Example:**
26
27
```scala
28
import org.apache.spark.{SparkConf, SparkContext}
29
30
// YarnClusterManager is automatically selected when master is "yarn"
31
val conf = new SparkConf()
32
.setAppName("MyApp")
33
.setMaster("yarn") // This triggers YarnClusterManager selection
34
35
val sc = new SparkContext(conf)
36
```
37
38
### Task Schedulers
39
40
YARN-specific task schedulers that provide rack awareness and optimal task placement within YARN clusters.
41
42
```scala { .api }
43
class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
44
override def getRackForHost(hostPort: String): Option[String]
45
}
46
47
class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)
48
```
49
50
**YarnScheduler:**
51
- Used in client deployment mode
52
- Provides YARN rack awareness for better locality
53
- Extends standard TaskSchedulerImpl with YARN-specific optimizations
54
55
**YarnClusterScheduler:**
56
- Used in cluster deployment mode
57
- Inherits all YarnScheduler functionality
58
- Identical behavior to YarnScheduler in current implementation
59
60
**Usage Example:**
61
62
```scala
63
// Schedulers are created automatically based on deploy mode
64
val conf = new SparkConf()
65
.setMaster("yarn")
66
.set("spark.submit.deployMode", "client") // Uses YarnScheduler
67
// .set("spark.submit.deployMode", "cluster") // Uses YarnClusterScheduler
68
```
69
70
### Scheduler Backends
71
72
YARN-specific scheduler backends that manage the communication between Spark and YARN ResourceManager.
73
74
```scala { .api }
75
abstract class YarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
76
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
77
78
def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit
79
override def start(): Unit
80
override def stop(): Unit
81
override def minRegisteredRatio: Double // Returns 0.8
82
}
83
```
84
85
**Common Methods:**
86
- `bindToYarn`: Associates backend with YARN application and attempt IDs
87
- `start()`: Initializes the backend and begins resource management
88
- `stop()`: Cleanly shuts down the backend and releases resources
89
- `minRegisteredRatio`: Returns 0.8 (80%) minimum executor registration ratio for YARN
90
91
#### YarnClientSchedulerBackend
92
93
```scala { .api }
94
class YarnClientSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
95
extends YarnSchedulerBackend(scheduler, sc) {
96
97
override def start(): Unit
98
def waitForApplication(): Unit
99
override def stop(): Unit
100
}
101
```
102
103
**Client Mode Specific:**
104
- `start()`: Creates and submits YARN application via Client
105
- `waitForApplication()`: Blocks until application reaches RUNNING state
106
- Used when `spark.submit.deployMode = "client"`
107
108
**Usage Example:**
109
110
```scala
111
import org.apache.spark.{SparkConf, SparkContext}
112
113
val conf = new SparkConf()
114
.setMaster("yarn")
115
.set("spark.submit.deployMode", "client")
116
.set("spark.yarn.queue", "default")
117
118
// YarnClientSchedulerBackend is created automatically
119
val sc = new SparkContext(conf)
120
```
121
122
#### YarnClusterSchedulerBackend
123
124
```scala { .api }
125
class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
126
extends YarnSchedulerBackend(scheduler, sc) {
127
128
override def start(): Unit
129
def getDriverLogUrls: Option[Map[String, String]]
130
override def stop(): Unit
131
}
132
```
133
134
**Cluster Mode Specific:**
135
- `start()`: Binds to existing YARN application (running inside ApplicationMaster)
136
- `getDriverLogUrls`: Returns driver log URLs from YARN for monitoring
137
- Used when `spark.submit.deployMode = "cluster"`
138
139
**Usage Example:**
140
141
```scala
142
import org.apache.spark.{SparkConf, SparkContext}
143
144
val conf = new SparkConf()
145
.setMaster("yarn")
146
.set("spark.submit.deployMode", "cluster")
147
.set("spark.yarn.queue", "production")
148
149
// YarnClusterSchedulerBackend is created automatically
150
val sc = new SparkContext(conf)
151
```
152
153
## Service Registration
154
155
The YARN cluster manager is automatically registered through Java's ServiceLoader mechanism:
156
157
**META-INF/services/org.apache.spark.scheduler.ExternalClusterManager:**
158
```
159
org.apache.spark.scheduler.cluster.YarnClusterManager
160
```
161
162
This enables automatic discovery when `master = "yarn"` without requiring explicit class registration.
163
164
## Deploy Mode Differences
165
166
| Component | Client Mode | Cluster Mode |
167
|-----------|-------------|--------------|
168
| **TaskScheduler** | YarnScheduler | YarnClusterScheduler |
169
| **SchedulerBackend** | YarnClientSchedulerBackend | YarnClusterSchedulerBackend |
170
| **Driver Location** | Client machine | YARN ApplicationMaster |
171
| **Application Submission** | Client submits to YARN | Pre-submitted by spark-submit |
172
173
## Error Handling
174
175
Common exceptions in cluster management:
176
177
```scala
178
// Unsupported deploy mode
179
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
180
181
// Backend initialization failure
182
throw new SparkException("Failed to initialize YARN scheduler backend")
183
```
184
185
## Integration Points
186
187
- **SparkContext**: Automatic cluster manager selection based on master URL
188
- **Configuration**: Driven by `spark.submit.deployMode` and `spark.master` settings
189
- **Resource Management**: Integrates with YarnAllocator for container management
190
- **Security**: Coordinates with security managers and credential providers