or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-master.mdclient.mdhadoop-utils.mdindex.mdschedulers.md

index.mddocs/

0

# Apache Spark YARN Integration

1

2

Apache Spark's YARN integration module enables Spark applications to run on Hadoop YARN (Yet Another Resource Negotiator) clusters alongside other distributed computing frameworks. This module provides an Application Master implementation that manages Spark driver and executor processes within YARN containers, handles resource allocation and deallocation through YARN's ResourceManager, and supports both client and cluster deployment modes.

3

4

## Package Information

5

6

- **Package Name**: yarn-parent_2.11

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Group ID**: org.apache.spark

10

- **Version**: 1.2.2

11

- **Installation**: Include in Maven/SBT project dependencies

12

13

## Core Imports

14

15

```scala

16

import org.apache.spark.deploy.yarn._

17

import org.apache.spark.scheduler.cluster._

18

```

19

20

For basic usage:

21

```scala

22

import org.apache.spark.{SparkConf, SparkContext}

23

```

24

25

## Basic Usage

26

27

The YARN integration is typically used by setting the Spark master URL and submitting applications through the YARN client:

28

29

```scala

30

import org.apache.spark.{SparkConf, SparkContext}

31

32

// Set master to YARN mode

33

val sparkConf = new SparkConf()

34

.setMaster("yarn-client") // or "yarn-cluster"

35

.setAppName("MySparkApp")

36

37

val sparkContext = new SparkContext(sparkConf)

38

```

39

40

For command-line submission:

41

42

```bash

43

# Client mode - driver runs on client machine

44

spark-submit --master yarn-client --num-executors 4 myapp.jar

45

46

# Cluster mode - driver runs in YARN container

47

spark-submit --master yarn-cluster --num-executors 4 myapp.jar

48

```

49

50

## Architecture

51

52

Apache Spark YARN integration is built around several key components:

53

54

- **Application Master**: Coordinates with YARN ResourceManager and manages executor allocation

55

- **YARN Client**: Submits applications to YARN and handles deployment preparation

56

- **Scheduler Backends**: Interface between Spark's scheduler and YARN's resource management

57

- **Resource Allocation**: Dynamic executor allocation based on workload demands

58

- **Security Integration**: Kerberos authentication and delegation token management

59

- **Deployment Modes**: Support for both client and cluster deployment patterns

60

61

## Capabilities

62

63

### YARN Client Operations

64

65

Core functionality for submitting Spark applications to YARN clusters, handling resource allocation requests, and managing application lifecycle.

66

67

```scala { .api }

68

// Main entry points for YARN submission

69

object Client {

70

def main(argStrings: Array[String]): Unit

71

}

72

73

object ApplicationMaster {

74

def main(args: Array[String]): Unit

75

}

76

```

77

78

[YARN Client](./client.md)

79

80

### Application Master Management

81

82

Application Master implementation that manages Spark applications within YARN containers, including driver execution and executor coordination.

83

84

```scala { .api }

85

private[spark] class ApplicationMaster(

86

args: ApplicationMasterArguments,

87

client: YarnRMClient

88

) extends Logging {

89

def run(): Int

90

}

91

92

object ApplicationMaster {

93

private[spark] def sparkContextInitialized(sc: SparkContext): Unit

94

private[spark] def sparkContextStopped(sc: SparkContext): Unit

95

}

96

```

97

98

[Application Master](./application-master.md)

99

100

### Scheduler Integration

101

102

Scheduler implementations that integrate Spark's task scheduling with YARN's resource management, supporting both client and cluster deployment modes.

103

104

```scala { .api }

105

private[spark] abstract class YarnSchedulerBackend(

106

scheduler: TaskSchedulerImpl,

107

sc: SparkContext

108

) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)

109

110

private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

111

override def getRackForHost(hostPort: String): Option[String]

112

override def postStartHook(): Unit

113

override def stop(): Unit

114

}

115

116

private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)

117

118

private[spark] class YarnClientSchedulerBackend(

119

scheduler: TaskSchedulerImpl,

120

sc: SparkContext

121

) extends YarnSchedulerBackend(scheduler, sc) {

122

override def start(): Unit

123

override def stop(): Unit

124

override def applicationId(): String

125

}

126

127

private[spark] class YarnClusterSchedulerBackend(

128

scheduler: TaskSchedulerImpl,

129

sc: SparkContext

130

) extends YarnSchedulerBackend(scheduler, sc) {

131

override def start(): Unit

132

override def applicationId(): String

133

}

134

```

135

136

[Scheduler Integration](./schedulers.md)

137

138

### Hadoop Utilities

139

140

YARN-specific utilities for Hadoop integration, security, and environment management that extend Spark's base Hadoop utilities.

141

142

```scala { .api }

143

class YarnSparkHadoopUtil extends SparkHadoopUtil {

144

override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit

145

override def isYarnMode(): Boolean

146

override def newConfiguration(conf: SparkConf): Configuration

147

override def addCredentials(conf: JobConf): Unit

148

override def getCurrentUserCredentials(): Credentials

149

override def addCurrentUserCredentials(creds: Credentials): Unit

150

override def addSecretKeyToUserCredentials(key: String, secret: String): Unit

151

override def getSecretKeyFromUserCredentials(key: String): Array[Byte]

152

}

153

154

object YarnSparkHadoopUtil {

155

def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit

156

def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit

157

def escapeForShell(arg: String): String

158

def lookupRack(conf: Configuration, host: String): String

159

def populateRackInfo(conf: Configuration, hostname: String): Unit

160

def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]

161

}

162

```

163

164

[Hadoop Utilities](./hadoop-utils.md)

165

166

## Types

167

168

```scala { .api }

169

// Core YARN types

170

import org.apache.hadoop.conf.Configuration

171

import org.apache.hadoop.mapred.JobConf

172

import org.apache.hadoop.security.{Credentials, UserGroupInformation}

173

import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ApplicationAttemptId, ApplicationId, LocalResource}

174

import org.apache.hadoop.yarn.client.api.YarnClient

175

import org.apache.hadoop.yarn.conf.YarnConfiguration

176

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}

177

import org.apache.spark.scheduler.TaskSchedulerImpl

178

import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

179

import org.apache.spark.rpc.RpcEndpointRef

180

import org.apache.spark.util.Utils

181

182

// YARN-specific classes

183

private[spark] class ApplicationMasterArguments(args: Array[String]) {

184

var userClass: String = null

185

var userJar: String = null

186

var userArgs: Array[String] = Array.empty

187

var numExecutors: Int = 2

188

var executorMemory: Int = 1024

189

var executorCores: Int = 1

190

var amMemory: Int = 512

191

var amCores: Int = 1

192

}

193

194

private[spark] trait YarnRMClient {

195

def getAttemptId(): ApplicationAttemptId

196

def getMaxRegAttempts(conf: YarnConfiguration, sparkConf: SparkConf): Int

197

def register(

198

driverUrl: String,

199

driverRef: RpcEndpointRef,

200

conf: YarnConfiguration,

201

sparkConf: SparkConf,

202

uiAddress: String,

203

uiHistoryAddress: String,

204

securityMgr: SecurityManager,

205

localResources: Map[String, LocalResource]

206

): YarnAllocator

207

def shutdown(): Unit

208

}

209

210

private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {

211

var addJars: String = null

212

var files: String = null

213

var archives: String = null

214

var userJar: String = null

215

var userClass: String = null

216

var userArgs: Array[String] = Array.empty

217

var executorMemory: Int = 1024

218

var executorCores: Int = 1

219

var numExecutors: Int = 2

220

var amQueue: String = "default"

221

var amMemory: Int = 512

222

var amCores: Int = 1

223

var appName: String = "Spark"

224

val amMemoryOverhead: Int = 384

225

val executorMemoryOverhead: Int = 384

226

}

227

228

// Collection types for utilities

229

import scala.collection.mutable.HashMap

230

import scala.collection.Map

231

```

232

233

## Configuration

234

235

Key configuration properties for YARN integration:

236

237

- `spark.yarn.max.executor.failures` - Maximum number of executor failures before failing the application

238

- `spark.yarn.max.worker.failures` - (Deprecated) Same as above for backward compatibility

239

- `spark.yarn.app.id` - Application ID set by the Application Master

240

- `spark.ui.port` - Set to "0" for ephemeral port allocation in YARN mode

241

242

## Deployment Modes

243

244

### YARN Client Mode (`yarn-client`)

245

- Driver runs on the client machine outside YARN cluster

246

- Direct communication between driver and executors

247

- Interactive applications and development use cases

248

249

### YARN Cluster Mode (`yarn-cluster`)

250

- Driver runs inside YARN cluster as part of Application Master

251

- Better for production batch jobs

252

- Automatic cleanup and resource management

253

254

## Version Support

255

256

The YARN module supports multiple Hadoop versions through separate implementations:

257

258

- **Alpha**: Hadoop 0.23 and 2.0.x (deprecated in Spark 1.3+)

259

- **Stable**: Hadoop 2.2+ (recommended)

260

261

Both implementations provide the same API surface but use different versions of the underlying Hadoop YARN API.