or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Spark Network YARN

1

2

Apache Spark YARN Shuffle Service provides shuffle service functionality for YARN-managed clusters. This library implements a shuffle service that runs as an auxiliary service on YARN NodeManagers, allowing Spark executors to fetch shuffle data even after the original executor that wrote the data has terminated.

3

4

## Package Information

5

6

- **Package Name**: spark-network-yarn_2.13

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-network-yarn_2.13

11

- **Installation**: Add as Maven dependency

12

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-network-yarn_2.13</artifactId>

17

<version>4.0.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.spark.network.yarn.YarnShuffleService;

25

import org.apache.spark.network.yarn.YarnShuffleServiceMetrics;

26

import org.apache.spark.network.yarn.util.HadoopConfigProvider;

27

28

// YARN integration imports

29

import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;

30

import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;

31

import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;

32

import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;

33

import org.apache.hadoop.conf.Configuration;

34

import org.apache.hadoop.fs.Path;

35

```

36

37

## Basic Usage

38

39

The YarnShuffleService is typically deployed as a YARN auxiliary service and managed by the NodeManager automatically:

40

41

```java

42

import org.apache.spark.network.yarn.YarnShuffleService;

43

import org.apache.hadoop.conf.Configuration;

44

45

// Create and initialize the shuffle service

46

YarnShuffleService shuffleService = new YarnShuffleService();

47

48

// Service is initialized with YARN configuration

49

Configuration conf = new Configuration();

50

shuffleService.serviceInit(conf);

51

52

// Service lifecycle is managed by YARN NodeManager

53

// Applications connect by setting spark.shuffle.service.enabled=true

54

```

55

56

## Architecture

57

58

The Spark YARN Shuffle Service is built around several key components:

59

60

- **YarnShuffleService**: Main auxiliary service class that integrates with YARN NodeManager

61

- **External Block Handler**: Manages shuffle block registration, retrieval, and cleanup

62

- **Merged Shuffle File Manager**: Handles merged shuffle files for improved performance

63

- **Authentication System**: Optional security layer using shuffle secrets

64

- **Metrics Integration**: Forwards shuffle metrics to Hadoop's metrics2 system

65

- **Recovery System**: Persists executor and secret information for NodeManager restart scenarios

66

67

## Capabilities

68

69

### YARN Auxiliary Service Integration

70

71

Main shuffle service class that extends Hadoop's AuxiliaryService for integration with YARN NodeManager.

72

73

```java { .api }

74

public class YarnShuffleService extends AuxiliaryService {

75

76

/**

77

* Default constructor - initializes service with name "spark_shuffle"

78

*/

79

public YarnShuffleService();

80

81

/**

82

* Return whether authentication is enabled as specified by the configuration.

83

* If so, fetch requests will fail unless the appropriate authentication secret

84

* for the application is provided.

85

* @return true if authentication is enabled, false otherwise

86

*/

87

private boolean isAuthenticationEnabled();

88

89

/**

90

* Initialize application with the shuffle service

91

* @param context Application initialization context from YARN

92

*/

93

public void initializeApplication(ApplicationInitializationContext context);

94

95

/**

96

* Stop and cleanup application resources

97

* @param context Application termination context from YARN

98

*/

99

public void stopApplication(ApplicationTerminationContext context);

100

101

/**

102

* Initialize container with the shuffle service

103

* @param context Container initialization context from YARN

104

*/

105

public void initializeContainer(ContainerInitializationContext context);

106

107

/**

108

* Stop container and cleanup resources

109

* @param context Container termination context from YARN

110

*/

111

public void stopContainer(ContainerTerminationContext context);

112

113

/**

114

* Get service metadata (returns empty buffer)

115

* @return Empty ByteBuffer for metadata

116

*/

117

public ByteBuffer getMetaData();

118

119

/**

120

* Set recovery path for NodeManager restart scenarios

121

* @param recoveryPath Path where recovery data should be stored

122

*/

123

public void setRecoveryPath(Path recoveryPath);

124

125

/**

126

* Initialize service with external configuration

127

* @param externalConf Hadoop configuration from NodeManager

128

* @throws Exception if initialization fails

129

*/

130

protected void serviceInit(Configuration externalConf) throws Exception;

131

132

/**

133

* Stop service and cleanup all resources

134

*/

135

protected void serviceStop() throws Exception;

136

137

/**

138

* Get recovery path for specific file

139

* @param fileName Name of the recovery file

140

* @return Path for recovery file storage

141

*/

142

protected Path getRecoveryPath(String fileName);

143

144

/**

145

* Initialize recovery database file

146

* @param dbName Database file name

147

* @return File object for recovery database

148

*/

149

protected File initRecoveryDb(String dbName);

150

151

/**

152

* Set customized MergedShuffleFileManager for unit testing

153

* @param mergeManager Custom merge manager implementation

154

*/

155

@VisibleForTesting

156

void setShuffleMergeManager(MergedShuffleFileManager mergeManager);

157

158

/**

159

* Create new MergedShuffleFileManager instance for testing

160

* @param conf Transport configuration

161

* @param mergeManagerFile Recovery file for merge manager

162

* @return New MergedShuffleFileManager instance

163

*/

164

@VisibleForTesting

165

static MergedShuffleFileManager newMergedShuffleFileManagerInstance(

166

TransportConf conf, File mergeManagerFile);

167

168

/**

169

* Load application secrets from the recovery database

170

* @throws IOException if database operations fail

171

*/

172

private void loadSecretsFromDb() throws IOException;

173

174

/**

175

* Parse database key to extract application ID

176

* @param s Database key string

177

* @return Application ID

178

* @throws IOException if parsing fails

179

*/

180

private static String parseDbAppKey(String s) throws IOException;

181

182

/**

183

* Create database key for application ID

184

* @param appExecId Application ID wrapper

185

* @return Database key as byte array

186

* @throws IOException if serialization fails

187

*/

188

private static byte[] dbAppKey(AppId appExecId) throws IOException;

189

}

190

```

191

192

### Application ID Management

193

194

Simple container class for encoding application identifiers with JSON serialization support.

195

196

```java { .api }

197

public static class AppId {

198

199

/** The application identifier string */

200

public final String appId;

201

202

/**

203

* Constructor for application ID

204

* @param appId Application identifier string

205

*/

206

public AppId(String appId);

207

208

/**

209

* Check equality with another AppId

210

* @param o Object to compare

211

* @return true if equal, false otherwise

212

*/

213

public boolean equals(Object o);

214

215

/**

216

* Compute hash code for the application ID

217

* @return Hash code value

218

*/

219

public int hashCode();

220

221

/**

222

* String representation of the application ID

223

* @return Formatted string representation

224

*/

225

public String toString();

226

}

227

```

228

229

### Metrics Integration

230

231

Forwards shuffle service metrics to Hadoop's metrics2 system for JMX exposure and monitoring.

232

233

```java { .api }

234

class YarnShuffleServiceMetrics implements MetricsSource {

235

236

/**

237

* Package-private constructor for metrics integration

238

* @param metricsNamespace Namespace for metrics collection

239

* @param metricSet Set of metrics to forward

240

*/

241

YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet);

242

243

/**

244

* Collect metrics from the shuffle service

245

* @param collector Metrics collector to receive metrics

246

* @param all Whether to return all metrics even if unchanged

247

*/

248

public void getMetrics(MetricsCollector collector, boolean all);

249

250

/**

251

* Collect individual metric and add to metrics record

252

* @param metricsRecordBuilder Builder for metrics record

253

* @param name Name of the metric

254

* @param metric The metric object to collect

255

*/

256

public static void collectMetric(

257

MetricsRecordBuilder metricsRecordBuilder,

258

String name,

259

Metric metric

260

);

261

262

/**

263

* Internal record class for metrics information

264

*/

265

private record ShuffleServiceMetricsInfo(String name, String description)

266

implements MetricsInfo;

267

}

268

```

269

270

### Configuration Provider

271

272

Configuration provider that bridges Hadoop Configuration with Spark's network configuration system.

273

274

```java { .api }

275

public class HadoopConfigProvider extends ConfigProvider {

276

277

/**

278

* Constructor that wraps Hadoop configuration

279

* @param conf Hadoop Configuration instance

280

*/

281

public HadoopConfigProvider(Configuration conf);

282

283

/**

284

* Get configuration value by name

285

* @param name Configuration property name

286

* @return Configuration value

287

* @throws NoSuchElementException if property not found

288

*/

289

public String get(String name);

290

291

/**

292

* Get configuration value with default fallback

293

* @param name Configuration property name

294

* @param defaultValue Default value if property not found

295

* @return Configuration value or default

296

*/

297

public String get(String name, String defaultValue);

298

299

/**

300

* Get all configuration entries as iterable

301

* @return Iterable of all configuration key-value pairs

302

*/

303

public Iterable<Map.Entry<String, String>> getAll();

304

}

305

```

306

307

## Configuration

308

309

The shuffle service supports extensive configuration through Hadoop Configuration properties:

310

311

### Core Configuration Keys

312

313

```java { .api }

314

// Service port configuration

315

public static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";

316

public static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;

317

318

// Authentication configuration

319

public static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";

320

public static final boolean DEFAULT_SPARK_AUTHENTICATE = false;

321

322

// Metrics configuration

323

public static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =

324

"spark.yarn.shuffle.service.metrics.namespace";

325

public static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService";

326

327

// Logging configuration

328

public static final String SPARK_SHUFFLE_SERVICE_LOGS_NAMESPACE_KEY =

329

"spark.yarn.shuffle.service.logs.namespace";

330

331

// Failure handling configuration

332

public static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";

333

public static final boolean DEFAULT_STOP_ON_FAILURE = false;

334

335

// Recovery configuration

336

public static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =

337

"spark.yarn.shuffle.server.recovery.disabled";

338

339

// Recovery file names

340

public static final String RECOVERY_FILE_NAME = "registeredExecutors";

341

public static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery";

342

343

// Configuration overlay resource

344

public static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME =

345

"spark-shuffle-site.xml";

346

347

// Testing and integration constants

348

@VisibleForTesting

349

public static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";

350

351

@VisibleForTesting

352

public static final String SECRET_KEY = "secret";

353

354

@VisibleForTesting

355

public static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME =

356

"sparkShuffleMergeRecovery";

357

358

// Internal database constants

359

private static final String APP_CREDS_KEY_PREFIX = "AppCreds";

360

private static final ObjectMapper mapper = new ObjectMapper();

361

```

362

363

### Configuration Usage

364

365

```java

366

import org.apache.hadoop.conf.Configuration;

367

import org.apache.spark.network.yarn.util.HadoopConfigProvider;

368

369

// Create configuration with shuffle service settings

370

Configuration conf = new Configuration();

371

conf.setInt("spark.shuffle.service.port", 7337);

372

conf.setBoolean("spark.authenticate", true);

373

conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");

374

375

// Use with configuration provider

376

HadoopConfigProvider configProvider = new HadoopConfigProvider(conf);

377

String port = configProvider.get("spark.shuffle.service.port", "7337");

378

```

379

380

## Error Handling

381

382

The shuffle service provides comprehensive error handling and recovery capabilities:

383

384

- **Graceful Failure Modes**: Can be configured to continue with degraded functionality on initialization failures

385

- **Recovery Support**: Persists executor information and application secrets for NodeManager restart scenarios

386

- **Authentication Errors**: Proper handling of unauthenticated requests when security is enabled

387

- **Configuration Errors**: Validates configuration and provides meaningful error messages

388

389

```java

390

// Example error handling configuration

391

Configuration conf = new Configuration();

392

conf.setBoolean("spark.yarn.shuffle.stopOnFailure", false); // Continue on failure

393

conf.setBoolean("spark.yarn.shuffle.server.recovery.disabled", false); // Enable recovery

394

```

395

396

## Dependencies

397

398

The shuffle service depends on several key libraries:

399

400

- **Hadoop YARN**: Core YARN APIs for auxiliary service integration (provided scope)

401

- **Spark Network Shuffle**: Core shuffle networking components and block management

402

- **Jackson**: JSON serialization for configuration and recovery data (shaded as `com.fasterxml.jackson`)

403

- **Netty**: High-performance networking library (shaded as `io.netty` with native library renaming)

404

- **Guava**: Google's core Java libraries for utilities (provided scope)

405

- **SLF4J**: Simple Logging Facade for Java (provided scope)

406

407

Dependencies with provided scope are expected to be available in the YARN environment. External dependencies like Jackson and Netty are carefully shaded to avoid classpath conflicts in YARN environments.

408

409

## Types

410

411

```java { .api }

412

// Core YARN context types (from hadoop-yarn-server-api)

413

import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;

414

import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;

415

import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;

416

import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;

417

import org.apache.hadoop.yarn.server.api.AuxiliaryService;

418

419

// Configuration and filesystem types (from hadoop-common)

420

import org.apache.hadoop.conf.Configuration;

421

import org.apache.hadoop.fs.Path;

422

423

// Network and transport types (from spark-network-common)

424

import org.apache.spark.network.util.TransportConf;

425

import org.apache.spark.network.shuffle.MergedShuffleFileManager;

426

427

// Metrics types (from hadoop-common and dropwizard-metrics)

428

import org.apache.hadoop.metrics2.MetricsCollector;

429

import org.apache.hadoop.metrics2.MetricsRecordBuilder;

430

import org.apache.hadoop.metrics2.MetricsSource;

431

import com.codahale.metrics.Metric;

432

import com.codahale.metrics.MetricSet;

433

434

// Standard Java types

435

import java.io.File;

436

import java.io.IOException;

437

import java.nio.ByteBuffer;

438

import java.util.Map;

439

import java.util.NoSuchElementException;

440

441

// Jackson types (shaded)

442

import com.fasterxml.jackson.annotation.JsonCreator;

443

import com.fasterxml.jackson.annotation.JsonProperty;

444

import com.fasterxml.jackson.core.type.TypeReference;

445

import com.fasterxml.jackson.databind.ObjectMapper;

446

```