or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-yarn_2-11

Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-yarn_2-11@2.4.0

0

# Apache Spark YARN

1

2

Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters. This module provides cluster managers, schedulers, and backends specifically designed for YARN environments, enabling seamless integration between Spark's distributed computing capabilities and YARN's resource management.

3

4

## Package Information

5

6

- **Package Name**: spark-yarn_2.11

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-yarn_2.11</artifactId>

14

<version>2.4.8</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```scala

21

import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAllocator}

22

import org.apache.spark.scheduler.cluster.{YarnClusterManager, SchedulerExtensionService, SchedulerExtensionServiceBinding}

23

import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

24

import org.apache.hadoop.conf.Configuration

25

import org.apache.hadoop.yarn.conf.YarnConfiguration

26

import org.apache.hadoop.security.Credentials

27

import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationAttemptId, Container, ContainerRequest}

28

import org.apache.hadoop.yarn.client.api.AMRMClient

29

import org.apache.spark.util.Clock

30

```

31

32

**For Java shuffle service integration:**

33

```java

34

import org.apache.spark.network.yarn.YarnShuffleService;

35

import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;

36

import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;

37

```

38

39

## Basic Usage

40

41

### Client Mode Deployment

42

43

```scala

44

import org.apache.spark.{SparkConf, SparkContext}

45

46

// Configure Spark for YARN client mode

47

val conf = new SparkConf()

48

.setAppName("MyApp")

49

.setMaster("yarn")

50

.set("spark.submit.deployMode", "client")

51

.set("spark.yarn.queue", "default")

52

53

val sc = new SparkContext(conf)

54

// Use SparkContext normally

55

sc.stop()

56

```

57

58

### Cluster Mode Deployment

59

60

```scala

61

import org.apache.spark.{SparkConf, SparkContext}

62

63

// Configure Spark for YARN cluster mode

64

val conf = new SparkConf()

65

.setAppName("MyApp")

66

.setMaster("yarn")

67

.set("spark.submit.deployMode", "cluster")

68

.set("spark.yarn.queue", "production")

69

70

val sc = new SparkContext(conf)

71

// Use SparkContext normally

72

sc.stop()

73

```

74

75

### Programmatic Application Submission

76

77

```scala

78

import org.apache.spark.deploy.yarn.{Client, ClientArguments}

79

import org.apache.spark.SparkConf

80

81

val sparkConf = new SparkConf()

82

.setAppName("MyYarnApp")

83

.set("spark.yarn.queue", "default")

84

85

val args = Array(

86

"--jar", "/path/to/my-app.jar",

87

"--class", "com.example.MyMainClass"

88

)

89

90

val clientArgs = new ClientArguments(args)

91

val client = new Client(clientArgs, sparkConf)

92

93

val applicationId = client.submitApplication()

94

println(s"Application submitted with ID: $applicationId")

95

```

96

97

## Architecture

98

99

The YARN integration follows Spark's pluggable cluster manager architecture:

100

101

- **External Cluster Manager**: `YarnClusterManager` registered as service provider

102

- **Scheduler Backends**: Separate implementations for client (`YarnClientSchedulerBackend`) and cluster (`YarnClusterSchedulerBackend`) modes

103

- **Application Master**: Handles both cluster mode driver execution and client mode coordination

104

- **Resource Management**: `YarnAllocator` manages container allocation and executor lifecycle

105

- **Security Integration**: Credential providers and delegation token management for secure clusters

106

107

## Capabilities

108

109

### Cluster Management

110

111

Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI.

112

113

```scala { .api }

114

class YarnClusterManager extends ExternalClusterManager {

115

def canCreate(masterURL: String): Boolean

116

def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

117

def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend

118

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit

119

}

120

```

121

122

[Cluster Management](./cluster-management.md)

123

124

### Application Deployment

125

126

Client API for submitting and managing YARN applications programmatically, supporting both client and cluster deployment modes.

127

128

```scala { .api }

129

class Client(args: ClientArguments, sparkConf: SparkConf) {

130

def submitApplication(): ApplicationId

131

def run(): Unit

132

def stop(): Unit

133

def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport

134

def getApplicationReport(appId: ApplicationId): ApplicationReport

135

}

136

```

137

138

[Application Deployment](./application-deployment.md)

139

140

### Resource Management

141

142

Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies.

143

144

```scala { .api }

145

class YarnAllocator(

146

driverUrl: String,

147

driverRef: RpcEndpointRef,

148

conf: YarnConfiguration,

149

sparkConf: SparkConf,

150

amClient: AMRMClient[ContainerRequest],

151

appAttemptId: ApplicationAttemptId,

152

securityMgr: SecurityManager,

153

localResources: Map[String, LocalResource],

154

resolver: SparkRackResolver,

155

clock: Clock = new SystemClock()

156

) {

157

def getNumExecutorsRunning: Int

158

def getNumExecutorsFailed: Int

159

def numContainersPendingAllocate: Int

160

def allocateResources(): Unit

161

def killExecutor(executorId: String): Unit

162

def stop(): Unit

163

}

164

```

165

166

[Resource Management](./resource-management.md)

167

168

### YARN Shuffle Service

169

170

External shuffle service that runs on YARN NodeManagers to provide shuffle data management for Spark applications, improving executor stability and enabling dynamic allocation.

171

172

```java { .api }

173

public class YarnShuffleService extends AuxiliaryService {

174

protected void serviceInit(Configuration conf) throws Exception;

175

protected void serviceStart() throws Exception;

176

protected void serviceStop() throws Exception;

177

public void initializeApplication(ApplicationInitializationContext context) throws Exception;

178

public void stopApplication(ApplicationTerminationContext context) throws Exception;

179

}

180

```

181

182

[YARN Shuffle Service](./yarn-shuffle-service.md)

183

184

### Security Integration

185

186

Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication.

187

188

```scala { .api }

189

trait ServiceCredentialProvider {

190

def serviceName: String

191

def credentialsRequired(hadoopConf: Configuration): Boolean

192

def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]

193

}

194

```

195

196

[Security Integration](./security-integration.md)

197

198

### Extension Points

199

200

Pluggable extension system for custom scheduler services and functionality in YARN deployments.

201

202

```scala { .api }

203

trait SchedulerExtensionService {

204

def start(binding: SchedulerExtensionServiceBinding): Unit

205

def stop(): Unit

206

}

207

```

208

209

[Extension Points](./extension-points.md)

210

211

### Configuration System

212

213

YARN-specific configuration options for controlling resource allocation, security, and deployment behavior.

214

215

```scala { .api }

216

// Key configuration entries

217

val APPLICATION_TAGS: ConfigEntry[Seq[String]]

218

val MAX_APP_ATTEMPTS: ConfigEntry[Int]

219

val QUEUE_NAME: ConfigEntry[String]

220

val SPARK_ARCHIVE: OptionalConfigEntry[String]

221

val USER_CLASS_PATH_FIRST: ConfigEntry[Boolean]

222

```

223

224

[Configuration System](./configuration-system.md)

225

226

### Command Building Utilities

227

228

YARN-specific utilities for building container launch commands and managing Spark distribution.

229

230

```scala { .api }

231

object YarnSparkHadoopUtil {

232

def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit

233

val MEMORY_OVERHEAD_FACTOR: Double

234

val MEMORY_OVERHEAD_MIN: Long

235

val RM_REQUEST_PRIORITY: Priority

236

}

237

238

object YarnCommandBuilderUtils {

239

def quoteForBatchScript(arg: String): String

240

def findJarsDir(sparkHome: String): String

241

}

242

```

243

244

[Command Building Utilities](./command-building.md)

245

246

## Common Integration Patterns

247

248

### Custom Credential Provider

249

250

```scala

251

import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

252

253

class MyCredentialProvider extends ServiceCredentialProvider {

254

override def serviceName: String = "my-service"

255

256

override def credentialsRequired(hadoopConf: Configuration): Boolean = {

257

// Check if credentials are needed

258

hadoopConf.get("my.service.enabled", "false").toBoolean

259

}

260

261

override def obtainCredentials(

262

hadoopConf: Configuration,

263

sparkConf: SparkConf,

264

creds: Credentials): Option[Long] = {

265

// Obtain and add credentials

266

// Return renewal time in milliseconds, or None if no renewal needed

267

None

268

}

269

}

270

```

271

272

### Custom Scheduler Extension

273

274

```scala

275

import org.apache.spark.scheduler.cluster.SchedulerExtensionService

276

277

class MySchedulerExtension extends SchedulerExtensionService {

278

override def start(binding: SchedulerExtensionServiceBinding): Unit = {

279

// Initialize extension with access to scheduler components

280

}

281

282

override def stop(): Unit = {

283

// Cleanup extension resources

284

}

285

}

286

```

287

288

## Error Handling

289

290

Common exceptions thrown by YARN integration:

291

292

- `SparkException`: Thrown for unsupported deploy modes or configuration errors

293

- `IOException`: File system operations during staging and cleanup

294

- `YarnException`: YARN-specific errors during application submission or management

295

- `SecurityException`: Credential or authentication failures in secure clusters

296

297

## Integration Requirements

298

299

- **Hadoop/YARN**: Compatible Hadoop YARN cluster (2.6+)

300

- **Scala Version**: Built for Scala 2.11 binary compatibility

301

- **Spark Core**: Requires matching spark-core_2.11 dependency

302

- **Security**: Optional Kerberos configuration for secure clusters