External shuffle service for Spark on YARN that runs as a long-running auxiliary service in the NodeManager process
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn_2-11@2.4.00
# Spark Network YARN
1
2
An external shuffle service implementation for Apache Spark applications running on YARN clusters. This library provides a long-running auxiliary service that operates within the YARN NodeManager process, enabling Spark executors to fetch shuffle data remotely even after the original executor containers have been terminated.
3
4
## Package Information
5
6
- **Package Name**: spark-network-yarn_2.11
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-network-yarn_2.11</artifactId>
14
<version>2.4.8</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.spark.network.yarn.YarnShuffleService;
22
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
23
```
24
25
## Basic Usage
26
27
The YarnShuffleService is typically deployed as a YARN auxiliary service rather than used directly in application code. However, for integration testing or custom deployments:
28
29
```java
30
import org.apache.spark.network.yarn.YarnShuffleService;
31
import org.apache.hadoop.conf.Configuration;
32
33
// Create and configure the service
34
YarnShuffleService shuffleService = new YarnShuffleService();
35
36
// Configure Hadoop settings
37
Configuration conf = new Configuration();
38
conf.setInt("spark.shuffle.service.port", 7337);
39
conf.setBoolean("spark.authenticate", false);
40
41
// Initialize the service (typically done by YARN NodeManager)
42
shuffleService.serviceInit(conf);
43
44
// The service integrates with YARN application lifecycle
45
// Applications are initialized/stopped via YARN callbacks
46
```
47
48
## Architecture
49
50
The Spark Network YARN service is built around several key components:
51
52
- **YarnShuffleService**: Main auxiliary service that extends YARN's AuxiliaryService base class
53
- **HadoopConfigProvider**: Configuration adapter that bridges Hadoop Configuration to Spark's network layer
54
- **Authentication Layer**: Optional shuffle secret management for multi-tenant security
55
- **Recovery System**: LevelDB-based persistence for handling NodeManager restarts
56
- **Transport Layer**: Netty-based network server for shuffle data retrieval
57
58
The service operates as part of the YARN NodeManager process and provides shuffle data access to Spark executors across the cluster, enabling dynamic resource allocation and fault tolerance.
59
60
## Capabilities
61
62
### YARN Auxiliary Service Implementation
63
64
Core service that extends YARN's AuxiliaryService to provide external shuffle functionality for Spark applications.
65
66
```java { .api }
67
public class YarnShuffleService extends AuxiliaryService {
68
/**
69
* Creates a new YarnShuffleService with the default service name "spark_shuffle".
70
* This constructor is typically called by the YARN NodeManager during service initialization.
71
*/
72
public YarnShuffleService();
73
74
/**
75
* Initialize an application with the shuffle service, registering its shuffle secret.
76
* Called by YARN when a new application starts.
77
*
78
* @param context Application initialization context containing app ID and shuffle secret
79
*/
80
@Override
81
public void initializeApplication(ApplicationInitializationContext context);
82
83
/**
84
* Stop an application and cleanup its resources from the shuffle service.
85
* Called by YARN when an application terminates.
86
*
87
* @param context Application termination context containing app ID
88
*/
89
@Override
90
public void stopApplication(ApplicationTerminationContext context);
91
92
/**
93
* Initialize a container (logs the container ID for debugging).
94
* Called by YARN when a new container starts.
95
*
96
* @param context Container initialization context
97
*/
98
@Override
99
public void initializeContainer(ContainerInitializationContext context);
100
101
/**
102
* Stop a container (logs the container ID for debugging).
103
* Called by YARN when a container terminates.
104
*
105
* @param context Container termination context
106
*/
107
@Override
108
public void stopContainer(ContainerTerminationContext context);
109
110
/**
111
* Get metadata for the auxiliary service (currently returns empty buffer).
112
*
113
* @return Empty ByteBuffer as metadata is not currently used
114
*/
115
@Override
116
public ByteBuffer getMetaData();
117
118
/**
119
* Set the recovery path for shuffle service recovery when NodeManager restarts.
120
* Called by YARN NodeManager if recovery is enabled.
121
*
122
* @param recoveryPath Path where recovery data should be stored
123
*/
124
@Override
125
public void setRecoveryPath(Path recoveryPath);
126
}
127
```
128
129
### Service Configuration Keys
130
131
Configuration property names used by the shuffle service. Note that these constants are primarily for internal use, but knowing the keys is important for configuration.
132
133
```java { .api }
134
// Configuration keys (internal constants, but values are important for configuration)
135
// "spark.shuffle.service.port" - Port for shuffle service (default: 7337)
136
// "spark.authenticate" - Enable authentication (default: false)
137
// "spark.yarn.shuffle.stopOnFailure" - Stop NM on failure (default: false)
138
```
139
140
### Service Lifecycle Management
141
142
Protected methods that handle the service initialization and shutdown lifecycle.
143
144
```java { .api }
145
/**
146
* Initialize the shuffle server with the given Hadoop configuration.
147
* Sets up transport server, authentication, and recovery systems.
148
*
149
* @param conf Hadoop configuration containing service settings
150
* @throws Exception If service initialization fails
151
*/
152
@Override
153
protected void serviceInit(Configuration conf) throws Exception;
154
155
/**
156
* Stop the shuffle server and clean up all associated resources.
157
* Closes transport server, block handler, and recovery database.
158
*/
159
@Override
160
protected void serviceStop();
161
162
/**
163
* Get the recovery path specific to this auxiliary service for the given filename.
164
*
165
* @param fileName Name of the recovery file
166
* @return Path where the recovery file should be located
167
*/
168
protected Path getRecoveryPath(String fileName);
169
170
/**
171
* Initialize recovery database, handling migration from old NodeManager local directories.
172
*
173
* @param dbName Name of the database file
174
* @return File object pointing to the recovery database location
175
*/
176
protected File initRecoveryDb(String dbName);
177
178
/**
179
* Load shuffle secrets from the recovery database during service initialization.
180
* Called when authentication is enabled and recovery is configured.
181
*
182
* @throws IOException If database access fails during secret loading
183
*/
184
private void loadSecretsFromDb() throws IOException;
185
186
/**
187
* Parse a database key to extract the application ID.
188
* Used internally for database key management.
189
*
190
* @param s Database key string with APP_CREDS_KEY_PREFIX
191
* @return Application ID extracted from the key
192
* @throws IOException If key parsing fails
193
*/
194
private static String parseDbAppKey(String s) throws IOException;
195
196
/**
197
* Generate a database key for storing application credentials.
198
* Used internally for database key management.
199
*
200
* @param appExecId Application identity object
201
* @return Byte array representing the database key
202
* @throws IOException If key generation fails
203
*/
204
private static byte[] dbAppKey(AppId appExecId) throws IOException;
205
206
/**
207
* Check if authentication is enabled for the shuffle service.
208
* Authentication is enabled when a secret manager is configured.
209
*
210
* @return true if authentication is enabled, false otherwise
211
*/
212
private boolean isAuthenticationEnabled();
213
```
214
215
### Application Identity Management
216
217
Helper class for managing application identities in the shuffle service.
218
219
```java { .api }
220
public static class AppId {
221
/** The application ID string */
222
public final String appId;
223
224
/**
225
* Create a new AppId with JSON deserialization support.
226
*
227
* @param appId The application ID string
228
*/
229
@JsonCreator
230
public AppId(@JsonProperty("appId") String appId);
231
232
/**
233
* Check equality based on application ID.
234
*
235
* @param o Object to compare with
236
* @return true if the objects represent the same application ID
237
*/
238
@Override
239
public boolean equals(Object o);
240
241
/**
242
* Generate hash code based on application ID.
243
*
244
* @return Hash code for this AppId
245
*/
246
@Override
247
public int hashCode();
248
249
/**
250
* String representation of the AppId.
251
*
252
* @return Formatted string showing the application ID
253
*/
254
@Override
255
public String toString();
256
}
257
```
258
259
### Hadoop Configuration Integration
260
261
Configuration provider that adapts Hadoop Configuration for use with Spark's network layer.
262
263
```java { .api }
264
public class HadoopConfigProvider extends ConfigProvider {
265
/**
266
* Create a new configuration provider wrapping the given Hadoop configuration.
267
*
268
* @param conf Hadoop Configuration instance to wrap
269
*/
270
public HadoopConfigProvider(Configuration conf);
271
272
/**
273
* Get a configuration value by name.
274
*
275
* @param name Configuration property name
276
* @return Configuration value as String
277
* @throws NoSuchElementException If the property is not found
278
*/
279
@Override
280
public String get(String name);
281
282
/**
283
* Get a configuration value by name with a default fallback.
284
*
285
* @param name Configuration property name
286
* @param defaultValue Default value if property not found
287
* @return Configuration value or default if not found
288
*/
289
@Override
290
public String get(String name, String defaultValue);
291
292
/**
293
* Get all configuration entries as an iterable.
294
*
295
* @return Iterable of all configuration key-value pairs
296
*/
297
@Override
298
public Iterable<Map.Entry<String, String>> getAll();
299
}
300
```
301
302
## Types
303
304
### Required Hadoop/YARN Types
305
306
The service depends on several Hadoop and YARN types that applications must have available:
307
308
```java { .api }
309
// From Hadoop YARN
310
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
311
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
312
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
313
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
314
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
315
import org.apache.hadoop.yarn.api.records.ContainerId;
316
317
// From Hadoop Core
318
import org.apache.hadoop.conf.Configuration;
319
import org.apache.hadoop.fs.Path;
320
321
// From Java Standard Library
322
import java.io.File;
323
import java.io.IOException;
324
import java.nio.ByteBuffer;
325
import java.util.List;
326
import java.util.Map;
327
import java.util.NoSuchElementException;
328
329
// From Jackson JSON Processing
330
import com.fasterxml.jackson.annotation.JsonCreator;
331
import com.fasterxml.jackson.annotation.JsonProperty;
332
import com.fasterxml.jackson.databind.ObjectMapper;
333
334
// From Google Guava
335
import com.google.common.base.Objects;
336
import com.google.common.base.Preconditions;
337
import com.google.common.collect.Lists;
338
```
339
340
### Spark Network Types
341
342
Integration with Spark's network layer components:
343
344
```java { .api }
345
// These types are used internally but applications typically don't interact with them directly
346
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
347
import org.apache.spark.network.sasl.ShuffleSecretManager;
348
import org.apache.spark.network.server.TransportServer;
349
import org.apache.spark.network.server.TransportServerBootstrap;
350
import org.apache.spark.network.crypto.AuthServerBootstrap;
351
import org.apache.spark.network.TransportContext;
352
import org.apache.spark.network.util.TransportConf;
353
import org.apache.spark.network.util.ConfigProvider;
354
import org.apache.spark.network.util.LevelDBProvider;
355
356
// From LevelDB
357
import org.iq80.leveldb.DB;
358
import org.iq80.leveldb.DBIterator;
359
360
// From SLF4J Logging
361
import org.slf4j.Logger;
362
import org.slf4j.LoggerFactory;
363
```
364
365
## Configuration
366
367
### Required Configuration Properties
368
369
- `spark.shuffle.service.port` (default: 7337): Port for the shuffle service to listen on
370
- `spark.authenticate` (default: false): Enable authentication for shuffle requests
371
- `yarn.nodemanager.local-dirs`: Local directories for NodeManager (used for recovery file placement)
372
373
### Optional Configuration Properties
374
375
- `spark.yarn.shuffle.stopOnFailure` (default: false): Whether service initialization failure should stop the NodeManager
376
377
### Configuration Constants
378
379
The service uses the following internal configuration constants:
380
381
```java { .api }
382
// Configuration key constants (from source code)
383
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
384
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
385
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
386
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
387
private static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
388
private static final boolean DEFAULT_STOP_ON_FAILURE = false;
389
390
// Recovery file names
391
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
392
private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
393
```
394
395
### YARN Integration Configuration
396
397
The service is typically configured in YARN's yarn-site.xml:
398
399
```xml
400
<configuration>
401
<property>
402
<name>yarn.nodemanager.aux-services</name>
403
<value>spark_shuffle</value>
404
</property>
405
<property>
406
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
407
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
408
</property>
409
<property>
410
<name>spark.shuffle.service.port</name>
411
<value>7337</value>
412
</property>
413
</configuration>
414
```
415
416
## Error Handling
417
418
The service handles several types of errors gracefully:
419
420
- **Service Initialization Failures**: Logged and optionally stop NodeManager based on configuration
421
- **Database Errors**: Logged but don't prevent service operation
422
- **Application/Container Lifecycle Errors**: Logged and handled to prevent service disruption
423
- **Configuration Errors**: NoSuchElementException thrown for missing required properties
424
- **Recovery Failures**: File system errors during recovery are logged and handled
425
426
## Thread Safety
427
428
The YarnShuffleService is designed to handle concurrent operations safely:
429
430
- Service lifecycle methods are synchronized by YARN NodeManager
431
- Database operations use LevelDB's thread-safe implementation
432
- Network transport operations are handled by Netty's thread-safe components
433
- Multiple applications and containers can be managed concurrently