or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-network-yarn-2-12

YARN Shuffle Service for Apache Spark that provides external shuffle service functionality running as a long-running auxiliary service in the NodeManager process

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-yarn_2.12@3.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn-2-12@3.0.0

0

# Spark Network YARN

1

2

Spark Network YARN provides the YARN Shuffle Service functionality for Apache Spark, enabling external shuffle management in YARN-managed clusters. This service runs as a long-running auxiliary service within the NodeManager process and allows Spark applications to offload shuffle data storage and retrieval operations, improving application performance and reliability by maintaining shuffle data even when executors fail or are deallocated.

3

4

## Package Information

5

6

- **Package Name**: spark-network-yarn_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Maven Coordinates**: `org.apache.spark:spark-network-yarn_2.12:3.0.1`

10

- **Installation**: Include as dependency in Maven/Gradle project

11

12

## Core Imports

13

14

```java

15

import org.apache.spark.network.yarn.YarnShuffleService;

16

import org.apache.spark.network.yarn.util.HadoopConfigProvider;

17

import org.apache.hadoop.conf.Configuration;

18

```

19

20

## Basic Usage

21

22

```java

23

// Configure and start YARN Shuffle Service (typically done by YARN NodeManager)

24

YarnShuffleService shuffleService = new YarnShuffleService();

25

26

// Initialize with Hadoop configuration

27

Configuration conf = new Configuration();

28

conf.setBoolean("spark.authenticate", false);

29

conf.setInt("spark.shuffle.service.port", 7337);

30

31

shuffleService.serviceInit(conf);

32

33

// The service will automatically handle application lifecycle

34

// through YARN callbacks: initializeApplication, stopApplication, etc.

35

```

36

37

## Architecture

38

39

The Spark Network YARN module consists of three main components:

40

41

- **YarnShuffleService**: Main service implementation that extends Hadoop's AuxiliaryService, managing shuffle server lifecycle and application registration

42

- **YarnShuffleServiceMetrics**: Metrics forwarding system that integrates shuffle service metrics with Hadoop's metrics framework

43

- **HadoopConfigProvider**: Configuration adapter that bridges Hadoop Configuration to Spark's network configuration system

44

45

The service operates as an auxiliary service within YARN's NodeManager process, automatically starting and stopping with the NodeManager and handling Spark application lifecycle events.

46

47

## Capabilities

48

49

### YARN Shuffle Service

50

51

Core external shuffle service implementation for YARN clusters.

52

53

```java { .api }

54

public class YarnShuffleService extends AuxiliaryService {

55

// Constructors

56

public YarnShuffleService();

57

58

// Lifecycle methods

59

public void initializeApplication(ApplicationInitializationContext context);

60

public void stopApplication(ApplicationTerminationContext context);

61

public void initializeContainer(ContainerInitializationContext context);

62

public void stopContainer(ContainerTerminationContext context);

63

public ByteBuffer getMetaData();

64

public void setRecoveryPath(Path recoveryPath);

65

66

// Protected service lifecycle methods

67

protected void serviceInit(Configuration conf) throws Exception;

68

protected void serviceStop();

69

protected Path getRecoveryPath(String fileName);

70

protected File initRecoveryDb(String dbName);

71

}

72

```

73

74

**Key Configuration Properties:**

75

- `spark.shuffle.service.port` (default: 7337) - Port for shuffle server

76

- `spark.authenticate` (default: false) - Enable authentication

77

- `spark.yarn.shuffle.stopOnFailure` (default: false) - Stop NodeManager on service failure

78

79

### Application ID Encoding

80

81

Utility class for encoding application IDs in shuffle service context.

82

83

```java { .api }

84

public static class YarnShuffleService.AppId {

85

public final String appId;

86

87

// Constructors

88

public AppId(String appId);

89

90

// Standard object methods

91

public boolean equals(Object o);

92

public int hashCode();

93

public String toString();

94

}

95

```

96

97

### Metrics Integration

98

99

Forwards shuffle service metrics to Hadoop's metrics system for monitoring and observability.

100

101

```java { .api }

102

class YarnShuffleServiceMetrics implements MetricsSource {

103

// Constructor

104

YarnShuffleServiceMetrics(MetricSet metricSet);

105

106

// MetricsSource implementation

107

public void getMetrics(MetricsCollector collector, boolean all);

108

109

// Static utility methods

110

public static void collectMetric(

111

MetricsRecordBuilder metricsRecordBuilder,

112

String name,

113

Metric metric);

114

}

115

```

116

117

### Configuration Provider

118

119

Hadoop Configuration adapter for Spark network configuration system.

120

121

```java { .api }

122

public class HadoopConfigProvider extends ConfigProvider {

123

// Constructor

124

public HadoopConfigProvider(Configuration conf);

125

126

// ConfigProvider implementation

127

public String get(String name);

128

public String get(String name, String defaultValue);

129

public Iterable<Map.Entry<String, String>> getAll();

130

}

131

```

132

133

## Types

134

135

```java { .api }

136

// From Hadoop YARN APIs

137

interface ApplicationInitializationContext {

138

ApplicationId getApplicationId();

139

ByteBuffer getApplicationDataForService();

140

}

141

142

interface ApplicationTerminationContext {

143

ApplicationId getApplicationId();

144

}

145

146

interface ContainerInitializationContext {

147

ContainerId getContainerId();

148

}

149

150

interface ContainerTerminationContext {

151

ContainerId getContainerId();

152

}

153

154

// From Hadoop Configuration

155

class Configuration implements Iterable<Map.Entry<String, String>> {

156

boolean getBoolean(String name, boolean defaultValue);

157

int getInt(String name, int defaultValue);

158

String get(String name);

159

String[] getTrimmedStrings(String name);

160

}

161

162

// From Hadoop Metrics

163

interface MetricsSource {

164

void getMetrics(MetricsCollector collector, boolean all);

165

}

166

167

interface MetricsCollector {

168

MetricsRecordBuilder addRecord(String name);

169

}

170

171

interface MetricsRecordBuilder {

172

MetricsRecordBuilder addCounter(MetricsInfo info, long value);

173

MetricsRecordBuilder addGauge(MetricsInfo info, Number value);

174

}

175

176

// From Codahale Metrics

177

interface MetricSet {

178

Map<String, Metric> getMetrics();

179

}

180

181

interface Metric {

182

// Base interface for all metrics

183

}

184

185

interface Timer extends Metric {

186

long getCount();

187

double getFifteenMinuteRate();

188

double getFiveMinuteRate();

189

double getOneMinuteRate();

190

double getMeanRate();

191

}

192

193

interface Meter extends Metric {

194

long getCount();

195

double getFifteenMinuteRate();

196

double getFiveMinuteRate();

197

double getOneMinuteRate();

198

double getMeanRate();

199

}

200

201

interface Counter extends Metric {

202

long getCount();

203

}

204

205

interface Gauge<T> extends Metric {

206

T getValue();

207

}

208

```

209

210

## Error Handling

211

212

The service includes robust error handling for common scenarios:

213

214

- **Configuration errors**: Missing or invalid configuration values are handled with appropriate defaults

215

- **Authentication failures**: When authentication is enabled, unauthorized requests are rejected

216

- **Recovery failures**: Database corruption or missing recovery files are handled gracefully

217

- **Network errors**: Port binding failures and network issues are logged and can optionally stop the NodeManager

218

- **Application lifecycle errors**: Errors in application initialization/termination are logged but don't stop the service

219

220

Common exceptions:

221

- `NoSuchElementException` - Thrown by HadoopConfigProvider when required configuration key is missing

222

- `IOException` - Various I/O operations during service initialization, recovery, and database operations

223

- `Exception` - General service lifecycle exceptions that are caught and logged

224

225

## Integration Notes

226

227

- **YARN Integration**: Automatically registered and managed by YARN NodeManager as an auxiliary service

228

- **Spark Integration**: Spark applications connect by setting `spark.shuffle.service.enabled=true`

229

- **Authentication**: Optional SASL authentication using shared secrets prevents cross-application data access

230

- **Metrics Integration**: Automatically registers with Hadoop's DefaultMetricsSystem for JMX export

231

- **Recovery Support**: Supports NodeManager recovery by persisting application state and secrets to LevelDB

232

- **Shaded Dependencies**: Uses shaded Netty and Jackson dependencies to avoid classpath conflicts with YARN