or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-network-yarn_2-11

External shuffle service for Spark on YARN that runs as a long-running auxiliary service in the NodeManager process

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-yarn_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn_2-11@2.4.0

0

# Spark Network YARN

1

2

An external shuffle service implementation for Apache Spark applications running on YARN clusters. This library provides a long-running auxiliary service that operates within the YARN NodeManager process, enabling Spark executors to fetch shuffle data remotely even after the original executor containers have been terminated.

3

4

## Package Information

5

6

- **Package Name**: spark-network-yarn_2.11

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-network-yarn_2.11</artifactId>

14

<version>2.4.8</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.spark.network.yarn.YarnShuffleService;

22

import org.apache.spark.network.yarn.util.HadoopConfigProvider;

23

```

24

25

## Basic Usage

26

27

The YarnShuffleService is typically deployed as a YARN auxiliary service rather than used directly in application code. However, for integration testing or custom deployments:

28

29

```java

30

import org.apache.spark.network.yarn.YarnShuffleService;

31

import org.apache.hadoop.conf.Configuration;

32

33

// Create and configure the service

34

YarnShuffleService shuffleService = new YarnShuffleService();

35

36

// Configure Hadoop settings

37

Configuration conf = new Configuration();

38

conf.setInt("spark.shuffle.service.port", 7337);

39

conf.setBoolean("spark.authenticate", false);

40

41

// Initialize the service (typically done by YARN NodeManager)

42

shuffleService.serviceInit(conf);

43

44

// The service integrates with YARN application lifecycle

45

// Applications are initialized/stopped via YARN callbacks

46

```

47

48

## Architecture

49

50

The Spark Network YARN service is built around several key components:

51

52

- **YarnShuffleService**: Main auxiliary service that extends YARN's AuxiliaryService base class

53

- **HadoopConfigProvider**: Configuration adapter that bridges Hadoop Configuration to Spark's network layer

54

- **Authentication Layer**: Optional shuffle secret management for multi-tenant security

55

- **Recovery System**: LevelDB-based persistence for handling NodeManager restarts

56

- **Transport Layer**: Netty-based network server for shuffle data retrieval

57

58

The service operates as part of the YARN NodeManager process and provides shuffle data access to Spark executors across the cluster, enabling dynamic resource allocation and fault tolerance.

59

60

## Capabilities

61

62

### YARN Auxiliary Service Implementation

63

64

Core service that extends YARN's AuxiliaryService to provide external shuffle functionality for Spark applications.

65

66

```java { .api }

67

public class YarnShuffleService extends AuxiliaryService {

68

/**

69

* Creates a new YarnShuffleService with the default service name "spark_shuffle".

70

* This constructor is typically called by the YARN NodeManager during service initialization.

71

*/

72

public YarnShuffleService();

73

74

/**

75

* Initialize an application with the shuffle service, registering its shuffle secret.

76

* Called by YARN when a new application starts.

77

*

78

* @param context Application initialization context containing app ID and shuffle secret

79

*/

80

@Override

81

public void initializeApplication(ApplicationInitializationContext context);

82

83

/**

84

* Stop an application and cleanup its resources from the shuffle service.

85

* Called by YARN when an application terminates.

86

*

87

* @param context Application termination context containing app ID

88

*/

89

@Override

90

public void stopApplication(ApplicationTerminationContext context);

91

92

/**

93

* Initialize a container (logs the container ID for debugging).

94

* Called by YARN when a new container starts.

95

*

96

* @param context Container initialization context

97

*/

98

@Override

99

public void initializeContainer(ContainerInitializationContext context);

100

101

/**

102

* Stop a container (logs the container ID for debugging).

103

* Called by YARN when a container terminates.

104

*

105

* @param context Container termination context

106

*/

107

@Override

108

public void stopContainer(ContainerTerminationContext context);

109

110

/**

111

* Get metadata for the auxiliary service (currently returns empty buffer).

112

*

113

* @return Empty ByteBuffer as metadata is not currently used

114

*/

115

@Override

116

public ByteBuffer getMetaData();

117

118

/**

119

* Set the recovery path for shuffle service recovery when NodeManager restarts.

120

* Called by YARN NodeManager if recovery is enabled.

121

*

122

* @param recoveryPath Path where recovery data should be stored

123

*/

124

@Override

125

public void setRecoveryPath(Path recoveryPath);

126

}

127

```

128

129

### Service Configuration Keys

130

131

Configuration property names used by the shuffle service. Note that these constants are primarily for internal use, but knowing the keys is important for configuration.

132

133

```java { .api }

134

// Configuration keys (internal constants, but values are important for configuration)

135

// "spark.shuffle.service.port" - Port for shuffle service (default: 7337)

136

// "spark.authenticate" - Enable authentication (default: false)

137

// "spark.yarn.shuffle.stopOnFailure" - Stop NM on failure (default: false)

138

```

139

140

### Service Lifecycle Management

141

142

Protected methods that handle the service initialization and shutdown lifecycle.

143

144

```java { .api }

145

/**

146

* Initialize the shuffle server with the given Hadoop configuration.

147

* Sets up transport server, authentication, and recovery systems.

148

*

149

* @param conf Hadoop configuration containing service settings

150

* @throws Exception If service initialization fails

151

*/

152

@Override

153

protected void serviceInit(Configuration conf) throws Exception;

154

155

/**

156

* Stop the shuffle server and clean up all associated resources.

157

* Closes transport server, block handler, and recovery database.

158

*/

159

@Override

160

protected void serviceStop();

161

162

/**

163

* Get the recovery path specific to this auxiliary service for the given filename.

164

*

165

* @param fileName Name of the recovery file

166

* @return Path where the recovery file should be located

167

*/

168

protected Path getRecoveryPath(String fileName);

169

170

/**

171

* Initialize recovery database, handling migration from old NodeManager local directories.

172

*

173

* @param dbName Name of the database file

174

* @return File object pointing to the recovery database location

175

*/

176

protected File initRecoveryDb(String dbName);

177

178

/**

179

* Load shuffle secrets from the recovery database during service initialization.

180

* Called when authentication is enabled and recovery is configured.

181

*

182

* @throws IOException If database access fails during secret loading

183

*/

184

private void loadSecretsFromDb() throws IOException;

185

186

/**

187

* Parse a database key to extract the application ID.

188

* Used internally for database key management.

189

*

190

* @param s Database key string with APP_CREDS_KEY_PREFIX

191

* @return Application ID extracted from the key

192

* @throws IOException If key parsing fails

193

*/

194

private static String parseDbAppKey(String s) throws IOException;

195

196

/**

197

* Generate a database key for storing application credentials.

198

* Used internally for database key management.

199

*

200

* @param appExecId Application identity object

201

* @return Byte array representing the database key

202

* @throws IOException If key generation fails

203

*/

204

private static byte[] dbAppKey(AppId appExecId) throws IOException;

205

206

/**

207

* Check if authentication is enabled for the shuffle service.

208

* Authentication is enabled when a secret manager is configured.

209

*

210

* @return true if authentication is enabled, false otherwise

211

*/

212

private boolean isAuthenticationEnabled();

213

```

214

215

### Application Identity Management

216

217

Helper class for managing application identities in the shuffle service.

218

219

```java { .api }

220

public static class AppId {

221

/** The application ID string */

222

public final String appId;

223

224

/**

225

* Create a new AppId with JSON deserialization support.

226

*

227

* @param appId The application ID string

228

*/

229

@JsonCreator

230

public AppId(@JsonProperty("appId") String appId);

231

232

/**

233

* Check equality based on application ID.

234

*

235

* @param o Object to compare with

236

* @return true if the objects represent the same application ID

237

*/

238

@Override

239

public boolean equals(Object o);

240

241

/**

242

* Generate hash code based on application ID.

243

*

244

* @return Hash code for this AppId

245

*/

246

@Override

247

public int hashCode();

248

249

/**

250

* String representation of the AppId.

251

*

252

* @return Formatted string showing the application ID

253

*/

254

@Override

255

public String toString();

256

}

257

```

258

259

### Hadoop Configuration Integration

260

261

Configuration provider that adapts Hadoop Configuration for use with Spark's network layer.

262

263

```java { .api }

264

public class HadoopConfigProvider extends ConfigProvider {

265

/**

266

* Create a new configuration provider wrapping the given Hadoop configuration.

267

*

268

* @param conf Hadoop Configuration instance to wrap

269

*/

270

public HadoopConfigProvider(Configuration conf);

271

272

/**

273

* Get a configuration value by name.

274

*

275

* @param name Configuration property name

276

* @return Configuration value as String

277

* @throws NoSuchElementException If the property is not found

278

*/

279

@Override

280

public String get(String name);

281

282

/**

283

* Get a configuration value by name with a default fallback.

284

*

285

* @param name Configuration property name

286

* @param defaultValue Default value if property not found

287

* @return Configuration value or default if not found

288

*/

289

@Override

290

public String get(String name, String defaultValue);

291

292

/**

293

* Get all configuration entries as an iterable.

294

*

295

* @return Iterable of all configuration key-value pairs

296

*/

297

@Override

298

public Iterable<Map.Entry<String, String>> getAll();

299

}

300

```

301

302

## Types

303

304

### Required Hadoop/YARN Types

305

306

The service depends on several Hadoop and YARN types that applications must have available:

307

308

```java { .api }

309

// From Hadoop YARN

310

import org.apache.hadoop.yarn.server.api.AuxiliaryService;

311

import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;

312

import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;

313

import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;

314

import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;

315

import org.apache.hadoop.yarn.api.records.ContainerId;

316

317

// From Hadoop Core

318

import org.apache.hadoop.conf.Configuration;

319

import org.apache.hadoop.fs.Path;

320

321

// From Java Standard Library

322

import java.io.File;

323

import java.io.IOException;

324

import java.nio.ByteBuffer;

325

import java.util.List;

326

import java.util.Map;

327

import java.util.NoSuchElementException;

328

329

// From Jackson JSON Processing

330

import com.fasterxml.jackson.annotation.JsonCreator;

331

import com.fasterxml.jackson.annotation.JsonProperty;

332

import com.fasterxml.jackson.databind.ObjectMapper;

333

334

// From Google Guava

335

import com.google.common.base.Objects;

336

import com.google.common.base.Preconditions;

337

import com.google.common.collect.Lists;

338

```

339

340

### Spark Network Types

341

342

Integration with Spark's network layer components:

343

344

```java { .api }

345

// These types are used internally but applications typically don't interact with them directly

346

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

347

import org.apache.spark.network.sasl.ShuffleSecretManager;

348

import org.apache.spark.network.server.TransportServer;

349

import org.apache.spark.network.server.TransportServerBootstrap;

350

import org.apache.spark.network.crypto.AuthServerBootstrap;

351

import org.apache.spark.network.TransportContext;

352

import org.apache.spark.network.util.TransportConf;

353

import org.apache.spark.network.util.ConfigProvider;

354

import org.apache.spark.network.util.LevelDBProvider;

355

356

// From LevelDB

357

import org.iq80.leveldb.DB;

358

import org.iq80.leveldb.DBIterator;

359

360

// From SLF4J Logging

361

import org.slf4j.Logger;

362

import org.slf4j.LoggerFactory;

363

```

364

365

## Configuration

366

367

### Required Configuration Properties

368

369

- `spark.shuffle.service.port` (default: 7337): Port for the shuffle service to listen on

370

- `spark.authenticate` (default: false): Enable authentication for shuffle requests

371

- `yarn.nodemanager.local-dirs`: Local directories for NodeManager (used for recovery file placement)

372

373

### Optional Configuration Properties

374

375

- `spark.yarn.shuffle.stopOnFailure` (default: false): Whether service initialization failure should stop the NodeManager

376

377

### Configuration Constants

378

379

The service uses the following internal configuration constants:

380

381

```java { .api }

382

// Configuration key constants (from source code)

383

private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";

384

private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;

385

private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";

386

private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;

387

private static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";

388

private static final boolean DEFAULT_STOP_ON_FAILURE = false;

389

390

// Recovery file names

391

private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";

392

private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";

393

```

394

395

### YARN Integration Configuration

396

397

The service is typically configured in YARN's yarn-site.xml:

398

399

```xml

400

<configuration>

401

<property>

402

<name>yarn.nodemanager.aux-services</name>

403

<value>spark_shuffle</value>

404

</property>

405

<property>

406

<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>

407

<value>org.apache.spark.network.yarn.YarnShuffleService</value>

408

</property>

409

<property>

410

<name>spark.shuffle.service.port</name>

411

<value>7337</value>

412

</property>

413

</configuration>

414

```

415

416

## Error Handling

417

418

The service handles several types of errors gracefully:

419

420

- **Service Initialization Failures**: Logged and optionally stop NodeManager based on configuration

421

- **Database Errors**: Logged but don't prevent service operation

422

- **Application/Container Lifecycle Errors**: Logged and handled to prevent service disruption

423

- **Configuration Errors**: NoSuchElementException thrown for missing required properties

424

- **Recovery Failures**: File system errors during recovery are logged and handled

425

426

## Thread Safety

427

428

The YarnShuffleService is designed to handle concurrent operations safely:

429

430

- Service lifecycle methods are synchronized by YARN NodeManager

431

- Database operations use LevelDB's thread-safe implementation

432

- Network transport operations are handled by Netty's thread-safe components

433

- Multiple applications and containers can be managed concurrently