0
# Cluster Management
1
2
The cluster management layer provides the core Kubernetes integration for Apache Spark, implementing Spark's external cluster manager interface to enable native execution on Kubernetes clusters.
3
4
## Core Components
5
6
### KubernetesClusterManager { .api }
7
8
The main entry point for Kubernetes cluster management that integrates with Spark's cluster manager framework:
9
10
```scala
11
class KubernetesClusterManager extends ExternalClusterManager {
12
def canCreate(masterURL: String): Boolean
13
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
14
def createSchedulerBackend(
15
sc: SparkContext,
16
masterURL: String,
17
scheduler: TaskScheduler
18
): SchedulerBackend
19
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
20
}
21
```
22
23
**Key Responsibilities**:
24
- Validates Kubernetes master URLs (format: `k8s://https://kubernetes-api:port`)
25
- Creates Kubernetes-specific task schedulers and scheduler backends
26
- Manages the initialization lifecycle of cluster components
27
- Integrates with Spark's cluster manager registry
28
29
**Usage Example**:
30
```scala
31
// Automatic registration when using k8s:// master URL
32
val spark = SparkSession.builder()
33
.appName("MyKubernetesApp")
34
.master("k8s://https://my-cluster.example.com:443")
35
.config("spark.kubernetes.container.image", "my-spark:latest")
36
.getOrCreate()
37
```
38
39
### KubernetesClusterSchedulerBackend { .api }
40
41
Kubernetes-specific implementation of Spark's scheduler backend that manages executor lifecycle:
42
43
```scala
44
class KubernetesClusterSchedulerBackend(
45
scheduler: TaskSchedulerImpl,
46
sc: SparkContext,
47
kubernetesClient: KubernetesClient,
48
executorService: ScheduledExecutorService,
49
snapshotsStore: ExecutorPodsSnapshotsStore,
50
podAllocator: ExecutorPodsAllocator,
51
lifecycleEventHandler: ExecutorPodsLifecycleManager,
52
watchEvents: ExecutorPodsWatchSnapshotSource,
53
pollEvents: ExecutorPodsPollingSnapshotSource
54
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
55
```
56
57
**Core Features**:
58
- **Dynamic Resource Allocation**: Supports adding/removing executors based on workload
59
- **Pod Lifecycle Management**: Handles creation, monitoring, and cleanup of executor pods
60
- **State Synchronization**: Maintains consistency between Spark scheduler state and Kubernetes pod state
61
- **Fault Tolerance**: Automatically handles pod failures and rescheduling
62
63
**Key Methods**:
64
```scala
65
override def start(): Unit
66
override def stop(): Unit
67
override def doRequestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Future[Boolean]
68
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
69
```
70
71
## Executor Management Components
72
73
### ExecutorPodsAllocator { .api }
74
75
Manages the allocation and creation of new executor pods:
76
77
```scala
78
class ExecutorPodsAllocator(
79
conf: SparkConf,
80
secMgr: SecurityManager,
81
executorBuilder: KubernetesExecutorBuilder,
82
kubernetesClient: KubernetesClient,
83
snapshotsStore: ExecutorPodsSnapshotsStore,
84
clock: Clock
85
)
86
```
87
88
**Responsibilities**:
89
- Creates new executor pods when requested by the scheduler
90
- Applies resource configurations (CPU, memory, storage)
91
- Handles pod template application and customization
92
- Manages allocation timeouts and retries
93
94
**Key Operations**:
95
```scala
96
def setTotalExpectedExecutors(newTotal: Int): Unit
97
def start(applicationId: String): Unit
98
def stop(): Unit
99
```
100
101
### ExecutorPodsLifecycleManager { .api }
102
103
Manages the complete lifecycle of executor pods from creation to termination:
104
105
```scala
106
class ExecutorPodsLifecycleManager(
107
conf: SparkConf,
108
kubernetesClient: KubernetesClient,
109
snapshotsStore: ExecutorPodsSnapshotsStore
110
)
111
```
112
113
**Lifecycle Operations**:
114
- **Pod Creation**: Coordinates with allocator for new executor creation
115
- **Health Monitoring**: Tracks pod health and readiness
116
- **Failure Handling**: Detects and responds to pod failures
117
- **Resource Cleanup**: Ensures proper cleanup of terminated pods
118
119
**State Management**:
120
```scala
121
def start(applicationId: String): Unit
122
def stop(): Unit
123
def onFinalNonDeletedState(podState: FinalPodState): Unit
124
```
125
126
## Cluster Lifecycle Management
127
128
### Initialization Sequence
129
130
The cluster manager follows a specific initialization sequence:
131
132
```scala
133
// 1. Cluster Manager Creation
134
val clusterManager = new KubernetesClusterManager()
135
136
// 2. Task Scheduler Creation
137
val taskScheduler = clusterManager.createTaskScheduler(sparkContext, masterURL)
138
139
// 3. Scheduler Backend Creation
140
val schedulerBackend = clusterManager.createSchedulerBackend(
141
sparkContext, masterURL, taskScheduler
142
)
143
144
// 4. Component Initialization
145
clusterManager.initialize(taskScheduler, schedulerBackend)
146
```
147
148
### Resource Allocation Flow
149
150
Dynamic resource allocation follows this pattern:
151
152
```scala
153
// 1. Scheduler requests additional executors
154
scheduler.requestTotalExecutors(targetNum, localityAwareTasks, hostToLocalTaskCount)
155
156
// 2. Backend processes the request
157
schedulerBackend.doRequestTotalExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)
158
159
// 3. Allocator creates new pods
160
executorPodsAllocator.setTotalExpectedExecutors(newTotal)
161
162
// 4. Lifecycle manager monitors pod states
163
lifecycleManager.onFinalNonDeletedState(podState)
164
```
165
166
## Configuration Integration
167
168
### Kubernetes-Specific Settings
169
170
The cluster manager integrates with Spark's configuration system:
171
172
```scala
173
import org.apache.spark.deploy.k8s.Config._
174
175
val conf = new SparkConf()
176
// Cluster configuration
177
.set(KUBERNETES_NAMESPACE, "spark-cluster")
178
.set(KUBERNETES_SERVICE_ACCOUNT_NAME, "spark-service-account")
179
180
// Resource limits
181
.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")
182
.set(KUBERNETES_DRIVER_LIMIT_CORES, "1")
183
184
// Dynamic allocation
185
.set(DYN_ALLOCATION_ENABLED, "true")
186
.set(DYN_ALLOCATION_MAX_EXECUTORS, "10")
187
.set(DYN_ALLOCATION_MIN_EXECUTORS, "1")
188
```
189
190
### Master URL Format
191
192
Kubernetes master URLs follow a specific format:
193
194
```scala
195
// Standard format
196
k8s://https://kubernetes-api-server:port
197
198
// Examples
199
k8s://https://my-cluster.example.com:443
200
k8s://https://kubernetes.default.svc.cluster.local:443
201
k8s://https://10.0.0.100:6443
202
```
203
204
## Error Handling and Fault Tolerance
205
206
### Pod Failure Recovery
207
208
The cluster manager provides robust failure recovery:
209
210
```scala
211
// Automatic pod restart on failure
212
class PodFailureHandler {
213
def handlePodFailure(failedPod: Pod, reason: String): Unit = {
214
logWarning(s"Executor pod ${failedPod.getMetadata.getName} failed: $reason")
215
216
// Remove failed executor from scheduler
217
scheduler.executorLost(executorId, SlaveLost(reason))
218
219
// Allocator will create replacement if needed
220
if (shouldReplace(failedPod)) {
221
allocator.requestNewExecutor()
222
}
223
}
224
}
225
```
226
227
### Network Resilience
228
229
Kubernetes API connectivity is handled robustly:
230
231
```scala
232
// Retry configuration for API operations
233
val retryConfig = RetryConfig.builder()
234
.maxAttempts(3)
235
.backoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
236
.build()
237
238
// Client with retry capability
239
val kubernetesClient = clientFactory.createKubernetesClient(
240
ClientType.Driver,
241
retryConfig
242
)
243
```
244
245
## Monitoring and Observability
246
247
### Metrics Integration
248
249
The cluster manager exposes metrics for monitoring:
250
251
```scala
252
// Executor metrics
253
sparkContext.statusTracker.getExecutorInfos.foreach { executor =>
254
println(s"Executor ${executor.executorId}: ${executor.totalCores} cores, ${executor.maxMemory} memory")
255
}
256
257
// Pod state metrics
258
val snapshot = snapshotsStore.currentSnapshot
259
val podCounts = snapshot.executorPods.values.groupBy(_.getClass.getSimpleName)
260
podCounts.foreach { case (state, pods) =>
261
println(s"$state: ${pods.size} pods")
262
}
263
```
264
265
### Logging Configuration
266
267
Comprehensive logging for troubleshooting:
268
269
```scala
270
// Enable Kubernetes-specific logging
271
spark.conf.set("spark.sql.adaptive.enabled", "true")
272
spark.conf.set("spark.kubernetes.executor.deleteOnTermination", "false") // For debugging
273
274
// Log levels
275
spark.sparkContext.setLogLevel("INFO") // General logs
276
// Set logger for Kubernetes components
277
Logger.getLogger("org.apache.spark.scheduler.cluster.k8s").setLevel(Level.DEBUG)
278
```
279
280
## Advanced Features
281
282
### Custom Pod Templates
283
284
Support for custom pod templates:
285
286
```scala
287
// Using pod template file
288
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/path/to/driver-template.yaml")
289
spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/path/to/executor-template.yaml")
290
291
// Template applied during pod creation
292
val templatePod = KubernetesUtils.loadPodFromTemplate(
293
kubernetesClient,
294
templateFile,
295
containerName
296
)
297
```
298
299
### Service Account Configuration
300
301
RBAC integration for secure cluster access:
302
303
```scala
304
// Service account configuration
305
spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")
306
spark.conf.set("spark.kubernetes.authenticate.executor.serviceAccountName", "spark-executor")
307
308
// RBAC permissions required:
309
// - pods: create, delete, get, list, watch
310
// - services: create, delete, get
311
// - configmaps: create, delete, get, list
312
```
313
314
### Node Selection and Affinity
315
316
Advanced scheduling capabilities:
317
318
```scala
319
// Node selector
320
spark.conf.set("spark.kubernetes.node.selector.node-type", "compute")
321
322
// Pod affinity (via annotations)
323
spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/preferred-anti-affinity",
324
"spark-executor")
325
326
// Tolerations for tainted nodes
327
spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/tolerations",
328
"[{\"key\":\"dedicated\",\"value\":\"spark\",\"effect\":\"NoSchedule\"}]")
329
```
330
331
## Best Practices
332
333
### Resource Management
334
335
```scala
336
// Proper resource configuration
337
val conf = new SparkConf()
338
.set(KUBERNETES_DRIVER_LIMIT_CORES, "1")
339
.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.5")
340
.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")
341
.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "1")
342
.set("spark.kubernetes.driver.memory", "2g")
343
.set("spark.kubernetes.executor.memory", "4g")
344
```
345
346
### High Availability
347
348
```scala
349
// Multiple API server endpoints
350
spark.conf.set("spark.kubernetes.apiserver.host", "https://k8s-api-1:6443,https://k8s-api-2:6443")
351
352
// Enable driver restart policy
353
spark.conf.set("spark.kubernetes.driver.restartPolicy", "OnFailure")
354
```
355
356
### Security Configuration
357
358
```scala
359
// TLS configuration
360
spark.conf.set("spark.kubernetes.apiserver.caCertFile", "/path/to/ca.crt")
361
spark.conf.set("spark.kubernetes.apiserver.clientCertFile", "/path/to/client.crt")
362
spark.conf.set("spark.kubernetes.apiserver.clientKeyFile", "/path/to/client.key")
363
364
// OAuth token authentication
365
spark.conf.set("spark.kubernetes.apiserver.oauthToken", "your-oauth-token")
366
```
367
368
The cluster management layer provides a robust foundation for running Spark applications on Kubernetes, with comprehensive integration of Kubernetes-native features and enterprise-grade reliability.