or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-master.mdclient.mdhadoop-utils.mdindex.mdschedulers.md

schedulers.mddocs/

0

# Scheduler Integration

1

2

The YARN module provides specialized scheduler implementations that integrate Spark's task scheduling with YARN's resource management. These schedulers handle resource allocation, task placement, and coordination between Spark's execution engine and YARN's cluster management.

3

4

## Imports

5

6

```scala

7

import org.apache.spark.SparkContext

8

import org.apache.spark.scheduler.TaskSchedulerImpl

9

import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

10

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

11

import org.apache.hadoop.yarn.api.records.ApplicationId

12

```

13

14

## Capabilities

15

16

### YARN Cluster Scheduler

17

18

Scheduler implementation for YARN cluster mode where the driver runs within the YARN cluster as part of the Application Master.

19

20

```scala { .api }

21

/**

22

* Task scheduler for YARN cluster mode

23

* Integrates with Application Master for resource management

24

*/

25

private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

26

27

/**

28

* Gets rack information for a given host

29

* @param hostPort Host and port string (e.g., "host1:8080")

30

* @return Optional rack information for the host

31

*/

32

override def getRackForHost(hostPort: String): Option[String]

33

34

/**

35

* Post-start initialization hook

36

* Notifies Application Master that SparkContext is ready

37

*/

38

override def postStartHook(): Unit

39

40

/**

41

* Stops the scheduler and notifies Application Master

42

*/

43

override def stop(): Unit

44

}

45

```

46

47

**Usage Example:**

48

49

```scala

50

// Automatically created when using yarn-cluster mode

51

val conf = new SparkConf().setMaster("yarn-cluster")

52

val sc = new SparkContext(conf)

53

// YarnClusterScheduler is automatically instantiated

54

```

55

56

### YARN Client Scheduler

57

58

Scheduler implementation for YARN client mode where the driver runs outside the YARN cluster on the client machine.

59

60

```scala { .api }

61

/**

62

* Task scheduler for YARN client mode

63

* Handles scheduling when driver runs outside YARN cluster

64

*/

65

private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

66

67

/**

68

* Gets rack information for a given host

69

* @param hostPort Host and port string (e.g., "host1:8080")

70

* @return Optional rack information for the host

71

*/

72

override def getRackForHost(hostPort: String): Option[String]

73

}

74

```

75

76

**Usage Example:**

77

78

```scala

79

// Automatically created when using yarn-client mode

80

val conf = new SparkConf().setMaster("yarn-client")

81

val sc = new SparkContext(conf)

82

// YarnClientClusterScheduler is automatically instantiated

83

```

84

85

### YARN Scheduler Backend Base

86

87

Base class for YARN scheduler backends providing common functionality.

88

89

```scala { .api }

90

/**

91

* Base scheduler backend for YARN implementations

92

* Provides common functionality for cluster and client modes

93

*/

94

private[spark] abstract class YarnSchedulerBackend(

95

scheduler: TaskSchedulerImpl,

96

sc: SparkContext

97

) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

98

99

/**

100

* Gets the YARN application ID

101

* @return Application ID string

102

*/

103

def applicationId(): String

104

105

/**

106

* Gets the application attempt ID

107

* @return Application attempt ID

108

*/

109

def applicationAttemptId(): Option[String]

110

}

111

```

112

113

### YARN Cluster Scheduler Backend

114

115

Scheduler backend that manages communication between the Spark scheduler and YARN resources in cluster mode.

116

117

```scala { .api }

118

/**

119

* Scheduler backend for YARN cluster mode

120

* Manages executor lifecycle and communication

121

*/

122

private[spark] class YarnClusterSchedulerBackend(

123

scheduler: TaskSchedulerImpl,

124

sc: SparkContext

125

) extends YarnSchedulerBackend(scheduler, sc) {

126

127

/**

128

* Starts the scheduler backend

129

*/

130

override def start(): Unit

131

132

/**

133

* Gets the YARN application ID from SparkConf

134

* @return Application ID string

135

*/

136

override def applicationId(): String

137

}

138

```

139

140

### YARN Client Scheduler Backend

141

142

Scheduler backend for client mode that coordinates between the external driver and YARN-managed executors.

143

144

```scala { .api }

145

/**

146

* Scheduler backend for YARN client mode

147

* Coordinates between external driver and YARN executors

148

*/

149

private[spark] class YarnClientSchedulerBackend(

150

scheduler: TaskSchedulerImpl,

151

sc: SparkContext

152

) extends YarnSchedulerBackend(scheduler, sc) {

153

154

/**

155

* Starts the scheduler backend and submits application to YARN

156

*/

157

override def start(): Unit

158

159

/**

160

* Stops the scheduler backend and cleans up resources

161

*/

162

override def stop(): Unit

163

164

/**

165

* Gets the YARN application ID

166

* @return Application ID string

167

*/

168

override def applicationId(): String

169

}

170

```

171

172

## Resource Management Integration

173

174

### Rack Awareness

175

176

Both scheduler implementations provide rack awareness for optimal task placement:

177

178

```scala

179

// Rack lookup using YARN's topology information

180

override def getRackForHost(hostPort: String): Option[String] = {

181

val host = Utils.parseHostPort(hostPort)._1

182

Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))

183

}

184

```

185

186

**Benefits:**

187

- **Data Locality**: Place tasks close to data when possible

188

- **Network Efficiency**: Minimize cross-rack network traffic

189

- **Fault Tolerance**: Distribute tasks across racks for resilience

190

191

### Resource Allocation Coordination

192

193

The schedulers coordinate with YARN for dynamic resource management:

194

195

- **Executor Requests**: Communicate resource needs to Application Master

196

- **Container Allocation**: Handle YARN container assignments

197

- **Dynamic Scaling**: Support for adding/removing executors based on workload

198

- **Resource Constraints**: Respect YARN queue limits and cluster capacity

199

200

## Scheduler Lifecycle

201

202

### Initialization Sequence

203

204

1. **Scheduler Creation**: Automatically instantiated based on master URL

205

2. **Backend Setup**: Create appropriate scheduler backend for the mode

206

3. **Resource Discovery**: Initialize rack topology and cluster information

207

4. **Registration**: Register with Application Master (cluster mode) or start AM (client mode)

208

209

### Cluster Mode Lifecycle

210

211

```scala

212

// 1. Scheduler initialization

213

val scheduler = new YarnClusterScheduler(sparkContext)

214

215

// 2. Post-start hook execution

216

scheduler.postStartHook()

217

// Calls: ApplicationMaster.sparkContextInitialized(sc)

218

219

// 3. Task scheduling and execution

220

// Normal Spark task scheduling operations

221

222

// 4. Shutdown

223

scheduler.stop()

224

// Calls: ApplicationMaster.sparkContextStopped(sc)

225

```

226

227

### Client Mode Lifecycle

228

229

```scala

230

// 1. Scheduler initialization

231

val scheduler = new YarnClientClusterScheduler(sparkContext)

232

233

// 2. Application Master communication setup

234

// Backend establishes communication with separate AM process

235

236

// 3. Task scheduling and execution

237

// Tasks executed on YARN-managed executors

238

239

// 4. Shutdown

240

scheduler.stop()

241

// Cleanup and AM notification

242

```

243

244

## Configuration Integration

245

246

### Scheduler Configuration

247

248

Key configuration properties affecting scheduler behavior:

249

250

```scala

251

val conf = new SparkConf()

252

.set("spark.scheduler.mode", "FAIR") // FAIR or FIFO scheduling

253

.set("spark.scheduler.allocation.file", "pools.xml") // Fair scheduler pools

254

.set("spark.locality.wait", "3s") // Data locality wait time

255

.set("spark.locality.wait.rack", "0") // Rack locality wait time

256

.set("spark.task.maxFailures", "3") // Task failure retry limit

257

```

258

259

### YARN-Specific Configuration

260

261

Configuration properties specific to YARN scheduler integration:

262

263

```scala

264

val conf = new SparkConf()

265

.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000") // Heartbeat interval

266

.set("spark.yarn.scheduler.initial-allocation.interval", "200ms") // Initial allocation

267

.set("spark.yarn.max.executor.failures", "6") // Max executor failures

268

.set("spark.yarn.am.waitTime", "100s") // AM wait time for SparkContext

269

```

270

271

## Task Placement and Locality

272

273

### Data Locality Optimization

274

275

The schedulers optimize task placement for data locality:

276

277

1. **Node Local**: Task runs on same node as data

278

2. **Rack Local**: Task runs on same rack as data

279

3. **Any**: Task can run anywhere in cluster

280

281

```scala

282

// Locality preference handling

283

def getRackForHost(hostPort: String): Option[String] = {

284

val host = Utils.parseHostPort(hostPort)._1

285

// Use YARN's rack resolution

286

Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))

287

}

288

```

289

290

### Resource Preference

291

292

Task placement considers:

293

294

- **Memory Requirements**: Match tasks to appropriately sized executors

295

- **CPU Requirements**: Consider CPU core availability

296

- **Network Topology**: Minimize data movement across network

297

- **Load Balancing**: Distribute work evenly across cluster

298

299

## Error Handling and Recovery

300

301

### Executor Failure Handling

302

303

The schedulers handle various executor failure scenarios:

304

305

```scala

306

// Executor failure detection and recovery

307

// - Automatic task retry on remaining executors

308

// - Executor replacement through Application Master

309

// - Blacklisting of problematic nodes

310

// - Application failure if too many executors fail

311

```

312

313

### Resource Unavailability

314

315

Handling resource constraints:

316

317

- **Queue Full**: Wait for resources to become available

318

- **Insufficient Memory**: Graceful degradation or failure

319

- **Network Partitions**: Timeout and retry mechanisms

320

- **Node Failures**: Task redistribution to healthy nodes

321

322

### Application Master Communication

323

324

In client mode, robust communication with Application Master:

325

326

- **Heartbeat Monitoring**: Regular health checks

327

- **Message Retry**: Retry failed communications

328

- **Connection Recovery**: Re-establish lost connections

329

- **Failover**: Handle Application Master restarts

330

331

## Integration with Spark Components

332

333

### TaskScheduler Integration

334

335

The YARN schedulers extend Spark's base TaskSchedulerImpl:

336

337

- **Task Submission**: Receive tasks from DAGScheduler

338

- **Resource Matching**: Match tasks to available executors

339

- **Task Launch**: Send tasks to appropriate executors

340

- **Result Collection**: Gather task results and metrics

341

342

### Driver Integration

343

344

Coordination with Spark driver:

345

346

- **Task Graph**: Receive task execution plans

347

- **Status Updates**: Report task and executor status

348

- **Metrics Collection**: Aggregate performance metrics

349

- **Event Handling**: Process Spark events and lifecycle changes

350

351

### Storage Integration

352

353

Integration with Spark's storage layer:

354

355

- **Block Manager**: Coordinate with block managers on executors

356

- **RDD Caching**: Optimize placement for cached RDDs

357

- **Shuffle Management**: Coordinate shuffle operations

358

- **Broadcast Variables**: Efficient distribution of broadcast data

359

360

## Monitoring and Metrics

361

362

### Scheduler Metrics

363

364

Key metrics tracked by YARN schedulers:

365

366

- **Task Completion Rate**: Tasks completed per second

367

- **Resource Utilization**: CPU and memory usage across cluster

368

- **Locality Statistics**: Data locality hit rates

369

- **Failure Rates**: Task and executor failure frequencies

370

371

### Integration with Spark UI

372

373

The schedulers integrate with Spark's web UI:

374

375

- **Executor Information**: Current executor status and resources

376

- **Task Details**: Task execution times and locality

377

- **Resource Usage**: Memory and CPU utilization graphs

378

- **Error Reporting**: Failed task and executor information