or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md

cluster-management.mddocs/

0

# Cluster Management

1

2

Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI. This module handles the lifecycle of YARN-based Spark applications and provides appropriate schedulers and backends for different deployment modes.

3

4

## Capabilities

5

6

### YarnClusterManager

7

8

Main entry point for YARN cluster management, registered as an `ExternalClusterManager` service provider. Automatically activated when `master = "yarn"` is specified in SparkConf.

9

10

```scala { .api }

11

class YarnClusterManager extends ExternalClusterManager {

12

def canCreate(masterURL: String): Boolean

13

def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

14

def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend

15

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit

16

}

17

```

18

19

**Parameters:**

20

- `masterURL`: Must be "yarn" for this cluster manager to be selected

21

- `sc`: SparkContext instance for the application

22

- `scheduler`: TaskScheduler instance to be initialized

23

- `backend`: SchedulerBackend instance to be initialized

24

25

**Usage Example:**

26

27

```scala

28

import org.apache.spark.{SparkConf, SparkContext}

29

30

// YarnClusterManager is automatically selected when master is "yarn"

31

val conf = new SparkConf()

32

.setAppName("MyApp")

33

.setMaster("yarn") // This triggers YarnClusterManager selection

34

35

val sc = new SparkContext(conf)

36

```

37

38

### Task Schedulers

39

40

YARN-specific task schedulers that provide rack awareness and optimal task placement within YARN clusters.

41

42

```scala { .api }

43

class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

44

override def getRackForHost(hostPort: String): Option[String]

45

}

46

47

class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)

48

```

49

50

**YarnScheduler:**

51

- Used in client deployment mode

52

- Provides YARN rack awareness for better locality

53

- Extends standard TaskSchedulerImpl with YARN-specific optimizations

54

55

**YarnClusterScheduler:**

56

- Used in cluster deployment mode

57

- Inherits all YarnScheduler functionality

58

- Identical behavior to YarnScheduler in current implementation

59

60

**Usage Example:**

61

62

```scala

63

// Schedulers are created automatically based on deploy mode

64

val conf = new SparkConf()

65

.setMaster("yarn")

66

.set("spark.submit.deployMode", "client") // Uses YarnScheduler

67

// .set("spark.submit.deployMode", "cluster") // Uses YarnClusterScheduler

68

```

69

70

### Scheduler Backends

71

72

YARN-specific scheduler backends that manage the communication between Spark and YARN ResourceManager.

73

74

```scala { .api }

75

abstract class YarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)

76

extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

77

78

def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit

79

override def start(): Unit

80

override def stop(): Unit

81

override def minRegisteredRatio: Double // Returns 0.8

82

}

83

```

84

85

**Common Methods:**

86

- `bindToYarn`: Associates backend with YARN application and attempt IDs

87

- `start()`: Initializes the backend and begins resource management

88

- `stop()`: Cleanly shuts down the backend and releases resources

89

- `minRegisteredRatio`: Returns 0.8 (80%) minimum executor registration ratio for YARN

90

91

#### YarnClientSchedulerBackend

92

93

```scala { .api }

94

class YarnClientSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)

95

extends YarnSchedulerBackend(scheduler, sc) {

96

97

override def start(): Unit

98

def waitForApplication(): Unit

99

override def stop(): Unit

100

}

101

```

102

103

**Client Mode Specific:**

104

- `start()`: Creates and submits YARN application via Client

105

- `waitForApplication()`: Blocks until application reaches RUNNING state

106

- Used when `spark.submit.deployMode = "client"`

107

108

**Usage Example:**

109

110

```scala

111

import org.apache.spark.{SparkConf, SparkContext}

112

113

val conf = new SparkConf()

114

.setMaster("yarn")

115

.set("spark.submit.deployMode", "client")

116

.set("spark.yarn.queue", "default")

117

118

// YarnClientSchedulerBackend is created automatically

119

val sc = new SparkContext(conf)

120

```

121

122

#### YarnClusterSchedulerBackend

123

124

```scala { .api }

125

class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)

126

extends YarnSchedulerBackend(scheduler, sc) {

127

128

override def start(): Unit

129

def getDriverLogUrls: Option[Map[String, String]]

130

override def stop(): Unit

131

}

132

```

133

134

**Cluster Mode Specific:**

135

- `start()`: Binds to existing YARN application (running inside ApplicationMaster)

136

- `getDriverLogUrls`: Returns driver log URLs from YARN for monitoring

137

- Used when `spark.submit.deployMode = "cluster"`

138

139

**Usage Example:**

140

141

```scala

142

import org.apache.spark.{SparkConf, SparkContext}

143

144

val conf = new SparkConf()

145

.setMaster("yarn")

146

.set("spark.submit.deployMode", "cluster")

147

.set("spark.yarn.queue", "production")

148

149

// YarnClusterSchedulerBackend is created automatically

150

val sc = new SparkContext(conf)

151

```

152

153

## Service Registration

154

155

The YARN cluster manager is automatically registered through Java's ServiceLoader mechanism:

156

157

**META-INF/services/org.apache.spark.scheduler.ExternalClusterManager:**

158

```

159

org.apache.spark.scheduler.cluster.YarnClusterManager

160

```

161

162

This enables automatic discovery when `master = "yarn"` without requiring explicit class registration.

163

164

## Deploy Mode Differences

165

166

| Component | Client Mode | Cluster Mode |

167

|-----------|-------------|--------------|

168

| **TaskScheduler** | YarnScheduler | YarnClusterScheduler |

169

| **SchedulerBackend** | YarnClientSchedulerBackend | YarnClusterSchedulerBackend |

170

| **Driver Location** | Client machine | YARN ApplicationMaster |

171

| **Application Submission** | Client submits to YARN | Pre-submitted by spark-submit |

172

173

## Error Handling

174

175

Common exceptions in cluster management:

176

177

```scala

178

// Unsupported deploy mode

179

throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")

180

181

// Backend initialization failure

182

throw new SparkException("Failed to initialize YARN scheduler backend")

183

```

184

185

## Integration Points

186

187

- **SparkContext**: Automatic cluster manager selection based on master URL

188

- **Configuration**: Driven by `spark.submit.deployMode` and `spark.master` settings

189

- **Resource Management**: Integrates with YarnAllocator for container management

190

- **Security**: Coordinates with security managers and credential providers