0
# Spark Network YARN
1
2
Apache Spark YARN Shuffle Service provides shuffle service functionality for YARN-managed clusters. This library implements a shuffle service that runs as an auxiliary service on YARN NodeManagers, allowing Spark executors to fetch shuffle data even after the original executor that wrote the data has terminated.
3
4
## Package Information
5
6
- **Package Name**: spark-network-yarn_2.13
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-network-yarn_2.13
11
- **Installation**: Add as Maven dependency
12
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-network-yarn_2.13</artifactId>
17
<version>4.0.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.spark.network.yarn.YarnShuffleService;
25
import org.apache.spark.network.yarn.YarnShuffleServiceMetrics;
26
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
27
28
// YARN integration imports
29
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
30
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
31
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
32
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
33
import org.apache.hadoop.conf.Configuration;
34
import org.apache.hadoop.fs.Path;
35
```
36
37
## Basic Usage
38
39
The YarnShuffleService is typically deployed as a YARN auxiliary service and managed by the NodeManager automatically:
40
41
```java
42
import org.apache.spark.network.yarn.YarnShuffleService;
43
import org.apache.hadoop.conf.Configuration;
44
45
// Create and initialize the shuffle service
46
YarnShuffleService shuffleService = new YarnShuffleService();
47
48
// Service is initialized with YARN configuration
49
Configuration conf = new Configuration();
50
shuffleService.serviceInit(conf);
51
52
// Service lifecycle is managed by YARN NodeManager
53
// Applications connect by setting spark.shuffle.service.enabled=true
54
```
55
56
## Architecture
57
58
The Spark YARN Shuffle Service is built around several key components:
59
60
- **YarnShuffleService**: Main auxiliary service class that integrates with YARN NodeManager
61
- **External Block Handler**: Manages shuffle block registration, retrieval, and cleanup
62
- **Merged Shuffle File Manager**: Handles merged shuffle files for improved performance
63
- **Authentication System**: Optional security layer using shuffle secrets
64
- **Metrics Integration**: Forwards shuffle metrics to Hadoop's metrics2 system
65
- **Recovery System**: Persists executor and secret information for NodeManager restart scenarios
66
67
## Capabilities
68
69
### YARN Auxiliary Service Integration
70
71
Main shuffle service class that extends Hadoop's AuxiliaryService for integration with YARN NodeManager.
72
73
```java { .api }
74
public class YarnShuffleService extends AuxiliaryService {
75
76
/**
77
* Default constructor - initializes service with name "spark_shuffle"
78
*/
79
public YarnShuffleService();
80
81
/**
82
* Return whether authentication is enabled as specified by the configuration.
83
* If so, fetch requests will fail unless the appropriate authentication secret
84
* for the application is provided.
85
* @return true if authentication is enabled, false otherwise
86
*/
87
private boolean isAuthenticationEnabled();
88
89
/**
90
* Initialize application with the shuffle service
91
* @param context Application initialization context from YARN
92
*/
93
public void initializeApplication(ApplicationInitializationContext context);
94
95
/**
96
* Stop and cleanup application resources
97
* @param context Application termination context from YARN
98
*/
99
public void stopApplication(ApplicationTerminationContext context);
100
101
/**
102
* Initialize container with the shuffle service
103
* @param context Container initialization context from YARN
104
*/
105
public void initializeContainer(ContainerInitializationContext context);
106
107
/**
108
* Stop container and cleanup resources
109
* @param context Container termination context from YARN
110
*/
111
public void stopContainer(ContainerTerminationContext context);
112
113
/**
114
* Get service metadata (returns empty buffer)
115
* @return Empty ByteBuffer for metadata
116
*/
117
public ByteBuffer getMetaData();
118
119
/**
120
* Set recovery path for NodeManager restart scenarios
121
* @param recoveryPath Path where recovery data should be stored
122
*/
123
public void setRecoveryPath(Path recoveryPath);
124
125
/**
126
* Initialize service with external configuration
127
* @param externalConf Hadoop configuration from NodeManager
128
* @throws Exception if initialization fails
129
*/
130
protected void serviceInit(Configuration externalConf) throws Exception;
131
132
/**
133
* Stop service and cleanup all resources
134
*/
135
protected void serviceStop() throws Exception;
136
137
/**
138
* Get recovery path for specific file
139
* @param fileName Name of the recovery file
140
* @return Path for recovery file storage
141
*/
142
protected Path getRecoveryPath(String fileName);
143
144
/**
145
* Initialize recovery database file
146
* @param dbName Database file name
147
* @return File object for recovery database
148
*/
149
protected File initRecoveryDb(String dbName);
150
151
/**
152
* Set customized MergedShuffleFileManager for unit testing
153
* @param mergeManager Custom merge manager implementation
154
*/
155
@VisibleForTesting
156
void setShuffleMergeManager(MergedShuffleFileManager mergeManager);
157
158
/**
159
* Create new MergedShuffleFileManager instance for testing
160
* @param conf Transport configuration
161
* @param mergeManagerFile Recovery file for merge manager
162
* @return New MergedShuffleFileManager instance
163
*/
164
@VisibleForTesting
165
static MergedShuffleFileManager newMergedShuffleFileManagerInstance(
166
TransportConf conf, File mergeManagerFile);
167
168
/**
169
* Load application secrets from the recovery database
170
* @throws IOException if database operations fail
171
*/
172
private void loadSecretsFromDb() throws IOException;
173
174
/**
175
* Parse database key to extract application ID
176
* @param s Database key string
177
* @return Application ID
178
* @throws IOException if parsing fails
179
*/
180
private static String parseDbAppKey(String s) throws IOException;
181
182
/**
183
* Create database key for application ID
184
* @param appExecId Application ID wrapper
185
* @return Database key as byte array
186
* @throws IOException if serialization fails
187
*/
188
private static byte[] dbAppKey(AppId appExecId) throws IOException;
189
}
190
```
191
192
### Application ID Management
193
194
Simple container class for encoding application identifiers with JSON serialization support.
195
196
```java { .api }
197
public static class AppId {
198
199
/** The application identifier string */
200
public final String appId;
201
202
/**
203
* Constructor for application ID
204
* @param appId Application identifier string
205
*/
206
public AppId(String appId);
207
208
/**
209
* Check equality with another AppId
210
* @param o Object to compare
211
* @return true if equal, false otherwise
212
*/
213
public boolean equals(Object o);
214
215
/**
216
* Compute hash code for the application ID
217
* @return Hash code value
218
*/
219
public int hashCode();
220
221
/**
222
* String representation of the application ID
223
* @return Formatted string representation
224
*/
225
public String toString();
226
}
227
```
228
229
### Metrics Integration
230
231
Forwards shuffle service metrics to Hadoop's metrics2 system for JMX exposure and monitoring.
232
233
```java { .api }
234
class YarnShuffleServiceMetrics implements MetricsSource {
235
236
/**
237
* Package-private constructor for metrics integration
238
* @param metricsNamespace Namespace for metrics collection
239
* @param metricSet Set of metrics to forward
240
*/
241
YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet);
242
243
/**
244
* Collect metrics from the shuffle service
245
* @param collector Metrics collector to receive metrics
246
* @param all Whether to return all metrics even if unchanged
247
*/
248
public void getMetrics(MetricsCollector collector, boolean all);
249
250
/**
251
* Collect individual metric and add to metrics record
252
* @param metricsRecordBuilder Builder for metrics record
253
* @param name Name of the metric
254
* @param metric The metric object to collect
255
*/
256
public static void collectMetric(
257
MetricsRecordBuilder metricsRecordBuilder,
258
String name,
259
Metric metric
260
);
261
262
/**
263
* Internal record class for metrics information
264
*/
265
private record ShuffleServiceMetricsInfo(String name, String description)
266
implements MetricsInfo;
267
}
268
```
269
270
### Configuration Provider
271
272
Configuration provider that bridges Hadoop Configuration with Spark's network configuration system.
273
274
```java { .api }
275
public class HadoopConfigProvider extends ConfigProvider {
276
277
/**
278
* Constructor that wraps Hadoop configuration
279
* @param conf Hadoop Configuration instance
280
*/
281
public HadoopConfigProvider(Configuration conf);
282
283
/**
284
* Get configuration value by name
285
* @param name Configuration property name
286
* @return Configuration value
287
* @throws NoSuchElementException if property not found
288
*/
289
public String get(String name);
290
291
/**
292
* Get configuration value with default fallback
293
* @param name Configuration property name
294
* @param defaultValue Default value if property not found
295
* @return Configuration value or default
296
*/
297
public String get(String name, String defaultValue);
298
299
/**
300
* Get all configuration entries as iterable
301
* @return Iterable of all configuration key-value pairs
302
*/
303
public Iterable<Map.Entry<String, String>> getAll();
304
}
305
```
306
307
## Configuration
308
309
The shuffle service supports extensive configuration through Hadoop Configuration properties:
310
311
### Core Configuration Keys
312
313
```java { .api }
314
// Service port configuration
315
public static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
316
public static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
317
318
// Authentication configuration
319
public static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
320
public static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
321
322
// Metrics configuration
323
public static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
324
"spark.yarn.shuffle.service.metrics.namespace";
325
public static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService";
326
327
// Logging configuration
328
public static final String SPARK_SHUFFLE_SERVICE_LOGS_NAMESPACE_KEY =
329
"spark.yarn.shuffle.service.logs.namespace";
330
331
// Failure handling configuration
332
public static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
333
public static final boolean DEFAULT_STOP_ON_FAILURE = false;
334
335
// Recovery configuration
336
public static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =
337
"spark.yarn.shuffle.server.recovery.disabled";
338
339
// Recovery file names
340
public static final String RECOVERY_FILE_NAME = "registeredExecutors";
341
public static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery";
342
343
// Configuration overlay resource
344
public static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME =
345
"spark-shuffle-site.xml";
346
347
// Testing and integration constants
348
@VisibleForTesting
349
public static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";
350
351
@VisibleForTesting
352
public static final String SECRET_KEY = "secret";
353
354
@VisibleForTesting
355
public static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME =
356
"sparkShuffleMergeRecovery";
357
358
// Internal database constants
359
private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
360
private static final ObjectMapper mapper = new ObjectMapper();
361
```
362
363
### Configuration Usage
364
365
```java
366
import org.apache.hadoop.conf.Configuration;
367
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
368
369
// Create configuration with shuffle service settings
370
Configuration conf = new Configuration();
371
conf.setInt("spark.shuffle.service.port", 7337);
372
conf.setBoolean("spark.authenticate", true);
373
conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");
374
375
// Use with configuration provider
376
HadoopConfigProvider configProvider = new HadoopConfigProvider(conf);
377
String port = configProvider.get("spark.shuffle.service.port", "7337");
378
```
379
380
## Error Handling
381
382
The shuffle service provides comprehensive error handling and recovery capabilities:
383
384
- **Graceful Failure Modes**: Can be configured to continue with degraded functionality on initialization failures
385
- **Recovery Support**: Persists executor information and application secrets for NodeManager restart scenarios
386
- **Authentication Errors**: Proper handling of unauthenticated requests when security is enabled
387
- **Configuration Errors**: Validates configuration and provides meaningful error messages
388
389
```java
390
// Example error handling configuration
391
Configuration conf = new Configuration();
392
conf.setBoolean("spark.yarn.shuffle.stopOnFailure", false); // Continue on failure
393
conf.setBoolean("spark.yarn.shuffle.server.recovery.disabled", false); // Enable recovery
394
```
395
396
## Dependencies
397
398
The shuffle service depends on several key libraries:
399
400
- **Hadoop YARN**: Core YARN APIs for auxiliary service integration (provided scope)
401
- **Spark Network Shuffle**: Core shuffle networking components and block management
402
- **Jackson**: JSON serialization for configuration and recovery data (shaded as `com.fasterxml.jackson`)
403
- **Netty**: High-performance networking library (shaded as `io.netty` with native library renaming)
404
- **Guava**: Google's core Java libraries for utilities (provided scope)
405
- **SLF4J**: Simple Logging Facade for Java (provided scope)
406
407
Dependencies with provided scope are expected to be available in the YARN environment. External dependencies like Jackson and Netty are carefully shaded to avoid classpath conflicts in YARN environments.
408
409
## Types
410
411
```java { .api }
412
// Core YARN context types (from hadoop-yarn-server-api)
413
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
414
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
415
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
416
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
417
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
418
419
// Configuration and filesystem types (from hadoop-common)
420
import org.apache.hadoop.conf.Configuration;
421
import org.apache.hadoop.fs.Path;
422
423
// Network and transport types (from spark-network-common)
424
import org.apache.spark.network.util.TransportConf;
425
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
426
427
// Metrics types (from hadoop-common and dropwizard-metrics)
428
import org.apache.hadoop.metrics2.MetricsCollector;
429
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
430
import org.apache.hadoop.metrics2.MetricsSource;
431
import com.codahale.metrics.Metric;
432
import com.codahale.metrics.MetricSet;
433
434
// Standard Java types
435
import java.io.File;
436
import java.io.IOException;
437
import java.nio.ByteBuffer;
438
import java.util.Map;
439
import java.util.NoSuchElementException;
440
441
// Jackson types (shaded)
442
import com.fasterxml.jackson.annotation.JsonCreator;
443
import com.fasterxml.jackson.annotation.JsonProperty;
444
import com.fasterxml.jackson.core.type.TypeReference;
445
import com.fasterxml.jackson.databind.ObjectMapper;
446
```