or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-management.mdconfiguration.mdindex.mdresource-management.mdscheduler-integration.md

scheduler-integration.mddocs/

0

# Scheduler Integration

1

2

Integration components that connect Spark's task scheduling system with YARN's resource management. These classes provide cluster manager implementation and scheduler backends for both client and cluster deployment modes.

3

4

## Capabilities

5

6

### YarnClusterManager

7

8

Cluster manager implementation that integrates Spark with YARN. Registered as an external cluster manager for "yarn" master URLs.

9

10

```scala { .api }

11

/**

12

* Cluster manager implementation for YARN integration

13

* Implements ExternalClusterManager interface

14

*/

15

private[spark] class YarnClusterManager extends ExternalClusterManager {

16

/**

17

* Check if this manager can create components for the given master URL

18

* @param masterURL Master URL (should be "yarn")

19

* @return true if masterURL is "yarn"

20

*/

21

def canCreate(masterURL: String): Boolean

22

23

/**

24

* Create YARN task scheduler

25

* @param sc SparkContext

26

* @param masterURL Master URL

27

* @return TaskScheduler instance

28

*/

29

def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

30

31

/**

32

* Create YARN scheduler backend

33

* @param sc SparkContext

34

* @param masterURL Master URL

35

* @param scheduler TaskScheduler instance

36

* @return SchedulerBackend instance

37

*/

38

def createSchedulerBackend(

39

sc: SparkContext,

40

masterURL: String,

41

scheduler: TaskScheduler

42

): SchedulerBackend

43

44

/**

45

* Initialize scheduler components

46

* @param scheduler TaskScheduler to initialize

47

* @param backend SchedulerBackend to initialize

48

*/

49

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit

50

}

51

```

52

53

**Usage Example:**

54

55

```scala

56

import org.apache.spark.{SparkConf, SparkContext}

57

58

// YarnClusterManager is automatically used when master is "yarn"

59

val conf = new SparkConf()

60

.setMaster("yarn")

61

.setAppName("YarnIntegrationExample")

62

63

val sc = new SparkContext(conf)

64

// YarnClusterManager creates appropriate scheduler and backend automatically

65

```

66

67

### YarnSchedulerBackend (Abstract)

68

69

Base class for YARN scheduler backends that handle resource requests and executor management.

70

71

```scala { .api }

72

/**

73

* Abstract base class for YARN scheduler backends

74

* @param scheduler TaskScheduler implementation

75

* @param sc SparkContext

76

*/

77

private[spark] abstract class YarnSchedulerBackend(

78

scheduler: TaskSchedulerImpl,

79

sc: SparkContext

80

) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

81

82

/**

83

* Bind scheduler backend to YARN application

84

* @param appId YARN application ID

85

* @param attemptId Optional application attempt ID

86

*/

87

protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit

88

89

/** Get YARN application ID as string */

90

def applicationId(): String

91

92

/** Get application attempt ID as string */

93

def applicationAttemptId(): Option[String]

94

95

/** Request total executors from YARN */

96

def doRequestTotalExecutors(): Future[Boolean]

97

98

/**

99

* Kill specific executors

100

* @param executorIds Sequence of executor IDs to kill

101

* @return Future indicating success/failure

102

*/

103

def doKillExecutors(executorIds: Seq[String]): Future[Boolean]

104

105

/** Get shuffle service merger locations */

106

def getShufflePushMergerLocations(): Seq[BlockManagerId]

107

}

108

```

109

110

### YarnClientSchedulerBackend

111

112

Scheduler backend for yarn-client mode where driver runs locally and executors run on YARN.

113

114

```scala { .api }

115

/**

116

* Scheduler backend for yarn-client mode

117

* @param scheduler TaskScheduler implementation

118

* @param sc SparkContext

119

*/

120

private[spark] class YarnClientSchedulerBackend(

121

scheduler: TaskSchedulerImpl,

122

sc: SparkContext

123

) extends YarnSchedulerBackend(scheduler, sc) {

124

125

/** Start client mode scheduler backend */

126

def start(): Unit

127

128

/**

129

* Stop scheduler backend

130

* @param exitCode Exit code for cleanup

131

*/

132

def stop(exitCode: Int): Unit

133

}

134

```

135

136

**Usage Example:**

137

138

```scala

139

import org.apache.spark.{SparkConf, SparkContext}

140

141

// Client mode - driver runs locally, executors on YARN

142

val conf = new SparkConf()

143

.setMaster("yarn")

144

.setDeployMode("client") // or spark.submit.deployMode=client

145

.setAppName("ClientModeApp")

146

147

val sc = new SparkContext(conf) // Uses YarnClientSchedulerBackend automatically

148

```

149

150

### YarnClusterSchedulerBackend

151

152

Scheduler backend for yarn-cluster mode where both driver and executors run on YARN.

153

154

```scala { .api }

155

/**

156

* Scheduler backend for yarn-cluster mode

157

* @param scheduler TaskScheduler implementation

158

* @param sc SparkContext

159

*/

160

private[spark] class YarnClusterSchedulerBackend(

161

scheduler: TaskSchedulerImpl,

162

sc: SparkContext

163

) extends YarnSchedulerBackend(scheduler, sc) {

164

165

/** Start cluster mode scheduler backend */

166

def start(): Unit

167

168

/**

169

* Stop scheduler backend

170

* @param exitCode Exit code for cleanup

171

*/

172

def stop(exitCode: Int): Unit

173

174

/** Get driver container log URLs */

175

def getDriverLogUrls(): Option[Map[String, String]]

176

177

/** Get driver container attributes */

178

def getDriverAttributes(): Option[Map[String, String]]

179

}

180

```

181

182

**Usage Example:**

183

184

```scala

185

import org.apache.spark.{SparkConf, SparkContext}

186

187

// Cluster mode - both driver and executors on YARN

188

val conf = new SparkConf()

189

.setMaster("yarn")

190

.setDeployMode("cluster") // or spark.submit.deployMode=cluster

191

.setAppName("ClusterModeApp")

192

193

val sc = new SparkContext(conf) // Uses YarnClusterSchedulerBackend automatically

194

```

195

196

### YarnScheduler

197

198

Task scheduler with YARN-specific enhancements for rack awareness and locality optimization.

199

200

```scala { .api }

201

/**

202

* Task scheduler with YARN-specific enhancements

203

* @param sc SparkContext

204

*/

205

class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl {

206

207

/**

208

* Get rack information for hosts

209

* @param hostPorts Sequence of host:port strings

210

* @return Sequence of optional rack names

211

*/

212

def getRacksForHosts(hostPorts: Seq[String]): Seq[Option[String]]

213

}

214

```

215

216

### YarnClusterScheduler

217

218

Task scheduler specifically for yarn-cluster mode with additional initialization hooks.

219

220

```scala { .api }

221

/**

222

* Task scheduler for yarn-cluster mode

223

* @param sc SparkContext

224

*/

225

class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler {

226

227

/** Post-start initialization hook */

228

def postStartHook(): Unit

229

}

230

```

231

232

## Integration Patterns

233

234

### Automatic Mode Selection

235

236

The YARN integration automatically selects the appropriate scheduler backend based on deployment mode:

237

238

```scala

239

import org.apache.spark.{SparkConf, SparkContext}

240

241

// Client mode (default)

242

val clientConf = new SparkConf()

243

.setMaster("yarn")

244

.setAppName("AutoClientMode")

245

// Creates: YarnScheduler + YarnClientSchedulerBackend

246

247

// Cluster mode

248

val clusterConf = new SparkConf()

249

.setMaster("yarn")

250

.setDeployMode("cluster")

251

.setAppName("AutoClusterMode")

252

// Creates: YarnClusterScheduler + YarnClusterSchedulerBackend

253

```

254

255

### Custom Scheduler Configuration

256

257

```scala

258

import org.apache.spark.SparkConf

259

260

val conf = new SparkConf()

261

.setMaster("yarn")

262

.setAppName("CustomSchedulerConfig")

263

// Scheduler-specific configurations

264

.set("spark.scheduler.mode", "FAIR")

265

.set("spark.scheduler.allocation.file", "/path/to/fairscheduler.xml")

266

.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")

267

.set("spark.scheduler.minRegisteredResourcesRatio", "0.8")

268

```

269

270

### Resource Request Patterns

271

272

```scala

273

// Configure executor resources

274

val conf = new SparkConf()

275

.setMaster("yarn")

276

.set("spark.executor.instances", "10")

277

.set("spark.executor.cores", "2")

278

.set("spark.executor.memory", "4g")

279

.set("spark.executor.memoryOverhead", "512m")

280

// Dynamic allocation

281

.set("spark.dynamicAllocation.enabled", "true")

282

.set("spark.dynamicAllocation.minExecutors", "2")

283

.set("spark.dynamicAllocation.maxExecutors", "20")

284

.set("spark.dynamicAllocation.initialExecutors", "5")

285

```

286

287

### Locality and Rack Awareness

288

289

```scala

290

import org.apache.spark.scheduler.cluster.YarnScheduler

291

292

// YarnScheduler automatically provides rack awareness

293

val scheduler = new YarnScheduler(sparkContext)

294

295

// Get rack information for data locality optimization

296

val hosts = Seq("worker1:7337", "worker2:7337", "worker3:7337")

297

val racks = scheduler.getRacksForHosts(hosts)

298

// Returns rack information if available from YARN

299

300

// Configure locality preferences

301

val conf = new SparkConf()

302

.set("spark.locality.wait", "3s")

303

.set("spark.locality.wait.node", "0")

304

.set("spark.locality.wait.rack", "0")

305

```

306

307

### Scheduler Backend Lifecycle

308

309

```scala

310

// The lifecycle is typically managed automatically, but understanding the flow:

311

312

// 1. YarnClusterManager.createSchedulerBackend() creates appropriate backend

313

// 2. Backend.start() initializes communication with YARN

314

// 3. Backend.bindToYarn() connects to YARN application

315

// 4. Backend handles executor requests via doRequestTotalExecutors()

316

// 5. Backend.stop() cleans up resources

317

318

// For custom integration:

319

class CustomYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)

320

extends YarnSchedulerBackend(scheduler, sc) {

321

322

override def start(): Unit = {

323

super.start()

324

// Custom initialization logic

325

}

326

327

override def doRequestTotalExecutors(): Future[Boolean] = {

328

// Custom executor request logic

329

super.doRequestTotalExecutors()

330

}

331

}

332

```

333

334

## Error Handling and Monitoring

335

336

### Backend Status Monitoring

337

338

```scala

339

// Monitor scheduler backend status

340

val backend = sparkContext.schedulerBackend.asInstanceOf[YarnSchedulerBackend]

341

342

// Get YARN application information

343

val appId = backend.applicationId()

344

val attemptId = backend.applicationAttemptId()

345

346

println(s"Application ID: $appId")

347

println(s"Attempt ID: $attemptId")

348

349

// For cluster mode, get driver information

350

backend match {

351

case clusterBackend: YarnClusterSchedulerBackend =>

352

val driverLogs = clusterBackend.getDriverLogUrls()

353

val driverAttrs = clusterBackend.getDriverAttributes()

354

println(s"Driver logs: $driverLogs")

355

println(s"Driver attributes: $driverAttrs")

356

case _ => // Client mode

357

}

358

```

359

360

### Executor Management

361

362

```scala

363

import scala.concurrent.Future

364

import scala.concurrent.ExecutionContext.Implicits.global

365

366

// Request additional executors

367

val backend = sparkContext.schedulerBackend.asInstanceOf[YarnSchedulerBackend]

368

val requestFuture: Future[Boolean] = backend.doRequestTotalExecutors()

369

370

requestFuture.onComplete {

371

case Success(true) => println("Executor request successful")

372

case Success(false) => println("Executor request failed")

373

case Failure(exception) => println(s"Request failed with exception: $exception")

374

}

375

376

// Kill specific executors

377

val executorsToKill = Seq("executor-1", "executor-2")

378

val killFuture: Future[Boolean] = backend.doKillExecutors(executorsToKill)

379

```