or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md

cluster-management.mddocs/

0

# Cluster Management

1

2

The cluster management layer provides the core Kubernetes integration for Apache Spark, implementing Spark's external cluster manager interface to enable native execution on Kubernetes clusters.

3

4

## Core Components

5

6

### KubernetesClusterManager { .api }

7

8

The main entry point for Kubernetes cluster management that integrates with Spark's cluster manager framework:

9

10

```scala

11

class KubernetesClusterManager extends ExternalClusterManager {

12

def canCreate(masterURL: String): Boolean

13

def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

14

def createSchedulerBackend(

15

sc: SparkContext,

16

masterURL: String,

17

scheduler: TaskScheduler

18

): SchedulerBackend

19

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit

20

}

21

```

22

23

**Key Responsibilities**:

24

- Validates Kubernetes master URLs (format: `k8s://https://kubernetes-api:port`)

25

- Creates Kubernetes-specific task schedulers and scheduler backends

26

- Manages the initialization lifecycle of cluster components

27

- Integrates with Spark's cluster manager registry

28

29

**Usage Example**:

30

```scala

31

// Automatic registration when using k8s:// master URL

32

val spark = SparkSession.builder()

33

.appName("MyKubernetesApp")

34

.master("k8s://https://my-cluster.example.com:443")

35

.config("spark.kubernetes.container.image", "my-spark:latest")

36

.getOrCreate()

37

```

38

39

### KubernetesClusterSchedulerBackend { .api }

40

41

Kubernetes-specific implementation of Spark's scheduler backend that manages executor lifecycle:

42

43

```scala

44

class KubernetesClusterSchedulerBackend(

45

scheduler: TaskSchedulerImpl,

46

sc: SparkContext,

47

kubernetesClient: KubernetesClient,

48

executorService: ScheduledExecutorService,

49

snapshotsStore: ExecutorPodsSnapshotsStore,

50

podAllocator: ExecutorPodsAllocator,

51

lifecycleEventHandler: ExecutorPodsLifecycleManager,

52

watchEvents: ExecutorPodsWatchSnapshotSource,

53

pollEvents: ExecutorPodsPollingSnapshotSource

54

) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)

55

```

56

57

**Core Features**:

58

- **Dynamic Resource Allocation**: Supports adding/removing executors based on workload

59

- **Pod Lifecycle Management**: Handles creation, monitoring, and cleanup of executor pods

60

- **State Synchronization**: Maintains consistency between Spark scheduler state and Kubernetes pod state

61

- **Fault Tolerance**: Automatically handles pod failures and rescheduling

62

63

**Key Methods**:

64

```scala

65

override def start(): Unit

66

override def stop(): Unit

67

override def doRequestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Future[Boolean]

68

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean]

69

```

70

71

## Executor Management Components

72

73

### ExecutorPodsAllocator { .api }

74

75

Manages the allocation and creation of new executor pods:

76

77

```scala

78

class ExecutorPodsAllocator(

79

conf: SparkConf,

80

secMgr: SecurityManager,

81

executorBuilder: KubernetesExecutorBuilder,

82

kubernetesClient: KubernetesClient,

83

snapshotsStore: ExecutorPodsSnapshotsStore,

84

clock: Clock

85

)

86

```

87

88

**Responsibilities**:

89

- Creates new executor pods when requested by the scheduler

90

- Applies resource configurations (CPU, memory, storage)

91

- Handles pod template application and customization

92

- Manages allocation timeouts and retries

93

94

**Key Operations**:

95

```scala

96

def setTotalExpectedExecutors(newTotal: Int): Unit

97

def start(applicationId: String): Unit

98

def stop(): Unit

99

```

100

101

### ExecutorPodsLifecycleManager { .api }

102

103

Manages the complete lifecycle of executor pods from creation to termination:

104

105

```scala

106

class ExecutorPodsLifecycleManager(

107

conf: SparkConf,

108

kubernetesClient: KubernetesClient,

109

snapshotsStore: ExecutorPodsSnapshotsStore

110

)

111

```

112

113

**Lifecycle Operations**:

114

- **Pod Creation**: Coordinates with allocator for new executor creation

115

- **Health Monitoring**: Tracks pod health and readiness

116

- **Failure Handling**: Detects and responds to pod failures

117

- **Resource Cleanup**: Ensures proper cleanup of terminated pods

118

119

**State Management**:

120

```scala

121

def start(applicationId: String): Unit

122

def stop(): Unit

123

def onFinalNonDeletedState(podState: FinalPodState): Unit

124

```

125

126

## Cluster Lifecycle Management

127

128

### Initialization Sequence

129

130

The cluster manager follows a specific initialization sequence:

131

132

```scala

133

// 1. Cluster Manager Creation

134

val clusterManager = new KubernetesClusterManager()

135

136

// 2. Task Scheduler Creation

137

val taskScheduler = clusterManager.createTaskScheduler(sparkContext, masterURL)

138

139

// 3. Scheduler Backend Creation

140

val schedulerBackend = clusterManager.createSchedulerBackend(

141

sparkContext, masterURL, taskScheduler

142

)

143

144

// 4. Component Initialization

145

clusterManager.initialize(taskScheduler, schedulerBackend)

146

```

147

148

### Resource Allocation Flow

149

150

Dynamic resource allocation follows this pattern:

151

152

```scala

153

// 1. Scheduler requests additional executors

154

scheduler.requestTotalExecutors(targetNum, localityAwareTasks, hostToLocalTaskCount)

155

156

// 2. Backend processes the request

157

schedulerBackend.doRequestTotalExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)

158

159

// 3. Allocator creates new pods

160

executorPodsAllocator.setTotalExpectedExecutors(newTotal)

161

162

// 4. Lifecycle manager monitors pod states

163

lifecycleManager.onFinalNonDeletedState(podState)

164

```

165

166

## Configuration Integration

167

168

### Kubernetes-Specific Settings

169

170

The cluster manager integrates with Spark's configuration system:

171

172

```scala

173

import org.apache.spark.deploy.k8s.Config._

174

175

val conf = new SparkConf()

176

// Cluster configuration

177

.set(KUBERNETES_NAMESPACE, "spark-cluster")

178

.set(KUBERNETES_SERVICE_ACCOUNT_NAME, "spark-service-account")

179

180

// Resource limits

181

.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")

182

.set(KUBERNETES_DRIVER_LIMIT_CORES, "1")

183

184

// Dynamic allocation

185

.set(DYN_ALLOCATION_ENABLED, "true")

186

.set(DYN_ALLOCATION_MAX_EXECUTORS, "10")

187

.set(DYN_ALLOCATION_MIN_EXECUTORS, "1")

188

```

189

190

### Master URL Format

191

192

Kubernetes master URLs follow a specific format:

193

194

```scala

195

// Standard format

196

k8s://https://kubernetes-api-server:port

197

198

// Examples

199

k8s://https://my-cluster.example.com:443

200

k8s://https://kubernetes.default.svc.cluster.local:443

201

k8s://https://10.0.0.100:6443

202

```

203

204

## Error Handling and Fault Tolerance

205

206

### Pod Failure Recovery

207

208

The cluster manager provides robust failure recovery:

209

210

```scala

211

// Automatic pod restart on failure

212

class PodFailureHandler {

213

def handlePodFailure(failedPod: Pod, reason: String): Unit = {

214

logWarning(s"Executor pod ${failedPod.getMetadata.getName} failed: $reason")

215

216

// Remove failed executor from scheduler

217

scheduler.executorLost(executorId, SlaveLost(reason))

218

219

// Allocator will create replacement if needed

220

if (shouldReplace(failedPod)) {

221

allocator.requestNewExecutor()

222

}

223

}

224

}

225

```

226

227

### Network Resilience

228

229

Kubernetes API connectivity is handled robustly:

230

231

```scala

232

// Retry configuration for API operations

233

val retryConfig = RetryConfig.builder()

234

.maxAttempts(3)

235

.backoff(Duration.ofSeconds(1), Duration.ofSeconds(10))

236

.build()

237

238

// Client with retry capability

239

val kubernetesClient = clientFactory.createKubernetesClient(

240

ClientType.Driver,

241

retryConfig

242

)

243

```

244

245

## Monitoring and Observability

246

247

### Metrics Integration

248

249

The cluster manager exposes metrics for monitoring:

250

251

```scala

252

// Executor metrics

253

sparkContext.statusTracker.getExecutorInfos.foreach { executor =>

254

println(s"Executor ${executor.executorId}: ${executor.totalCores} cores, ${executor.maxMemory} memory")

255

}

256

257

// Pod state metrics

258

val snapshot = snapshotsStore.currentSnapshot

259

val podCounts = snapshot.executorPods.values.groupBy(_.getClass.getSimpleName)

260

podCounts.foreach { case (state, pods) =>

261

println(s"$state: ${pods.size} pods")

262

}

263

```

264

265

### Logging Configuration

266

267

Comprehensive logging for troubleshooting:

268

269

```scala

270

// Enable Kubernetes-specific logging

271

spark.conf.set("spark.sql.adaptive.enabled", "true")

272

spark.conf.set("spark.kubernetes.executor.deleteOnTermination", "false") // For debugging

273

274

// Log levels

275

spark.sparkContext.setLogLevel("INFO") // General logs

276

// Set logger for Kubernetes components

277

Logger.getLogger("org.apache.spark.scheduler.cluster.k8s").setLevel(Level.DEBUG)

278

```

279

280

## Advanced Features

281

282

### Custom Pod Templates

283

284

Support for custom pod templates:

285

286

```scala

287

// Using pod template file

288

spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/path/to/driver-template.yaml")

289

spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/path/to/executor-template.yaml")

290

291

// Template applied during pod creation

292

val templatePod = KubernetesUtils.loadPodFromTemplate(

293

kubernetesClient,

294

templateFile,

295

containerName

296

)

297

```

298

299

### Service Account Configuration

300

301

RBAC integration for secure cluster access:

302

303

```scala

304

// Service account configuration

305

spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")

306

spark.conf.set("spark.kubernetes.authenticate.executor.serviceAccountName", "spark-executor")

307

308

// RBAC permissions required:

309

// - pods: create, delete, get, list, watch

310

// - services: create, delete, get

311

// - configmaps: create, delete, get, list

312

```

313

314

### Node Selection and Affinity

315

316

Advanced scheduling capabilities:

317

318

```scala

319

// Node selector

320

spark.conf.set("spark.kubernetes.node.selector.node-type", "compute")

321

322

// Pod affinity (via annotations)

323

spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/preferred-anti-affinity",

324

"spark-executor")

325

326

// Tolerations for tainted nodes

327

spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/tolerations",

328

"[{\"key\":\"dedicated\",\"value\":\"spark\",\"effect\":\"NoSchedule\"}]")

329

```

330

331

## Best Practices

332

333

### Resource Management

334

335

```scala

336

// Proper resource configuration

337

val conf = new SparkConf()

338

.set(KUBERNETES_DRIVER_LIMIT_CORES, "1")

339

.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.5")

340

.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")

341

.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "1")

342

.set("spark.kubernetes.driver.memory", "2g")

343

.set("spark.kubernetes.executor.memory", "4g")

344

```

345

346

### High Availability

347

348

```scala

349

// Multiple API server endpoints

350

spark.conf.set("spark.kubernetes.apiserver.host", "https://k8s-api-1:6443,https://k8s-api-2:6443")

351

352

// Enable driver restart policy

353

spark.conf.set("spark.kubernetes.driver.restartPolicy", "OnFailure")

354

```

355

356

### Security Configuration

357

358

```scala

359

// TLS configuration

360

spark.conf.set("spark.kubernetes.apiserver.caCertFile", "/path/to/ca.crt")

361

spark.conf.set("spark.kubernetes.apiserver.clientCertFile", "/path/to/client.crt")

362

spark.conf.set("spark.kubernetes.apiserver.clientKeyFile", "/path/to/client.key")

363

364

// OAuth token authentication

365

spark.conf.set("spark.kubernetes.apiserver.oauthToken", "your-oauth-token")

366

```

367

368

The cluster management layer provides a robust foundation for running Spark applications on Kubernetes, with comprehensive integration of Kubernetes-native features and enterprise-grade reliability.