YARN Shuffle Service for Apache Spark that runs as a long-running auxiliary service in the NodeManager process to enable efficient shuffle operations.
npx @tessl/cli install tessl/maven-spark-network-yarn_2.11@1.6.00
# YARN Shuffle Service for Apache Spark
1
2
## Overview
3
The `spark-network-yarn_2.11` package provides an external shuffle service for Apache Spark applications running on YARN clusters. This service runs as a long-running auxiliary service within the YARN NodeManager process, enabling efficient shuffle operations by managing shuffle data storage and retrieval independently of individual Spark executors. The service improves resource utilization and fault tolerance by decoupling shuffle data management from compute resources.
4
5
## Package Information
6
- **Name**: `spark-network-yarn_2.11`
7
- **Type**: Java Library
8
- **Language**: Java 8+
9
- **Version**: 1.6.3
10
- **Maven Coordinates**: `org.apache.spark:spark-network-yarn_2.11:1.6.3`
11
- **License**: Apache-2.0
12
13
### Installation
14
Add to your Maven `pom.xml`:
15
```xml
16
<dependency>
17
<groupId>org.apache.spark</groupId>
18
<artifactId>spark-network-yarn_2.11</artifactId>
19
<version>1.6.3</version>
20
</dependency>
21
```
22
23
## Core Imports
24
```java
25
import org.apache.spark.network.yarn.YarnShuffleService;
26
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
27
import org.apache.hadoop.conf.Configuration;
28
import org.apache.hadoop.yarn.server.api.*;
29
```
30
31
## Basic Usage
32
The YARN Shuffle Service is typically deployed and managed by YARN cluster administrators rather than being instantiated directly by application code. However, for testing or custom deployments:
33
34
```java
35
// Create and configure the shuffle service
36
YarnShuffleService shuffleService = new YarnShuffleService();
37
38
// Initialize with Hadoop configuration
39
Configuration hadoopConfig = new Configuration();
40
hadoopConfig.setInt("spark.shuffle.service.port", 7337);
41
hadoopConfig.setBoolean("spark.authenticate", false);
42
43
shuffleService.init(hadoopConfig);
44
45
// Service lifecycle is managed by YARN framework
46
// Applications connect by setting spark.shuffle.service.enabled=true
47
```
48
49
## Architecture
50
The service integrates with YARN's auxiliary service framework and Spark's network transport layer:
51
52
- **YarnShuffleService**: Main service class extending YARN's AuxiliaryService
53
- **HadoopConfigProvider**: Configuration bridge between Hadoop and Spark network layer
54
- **ExternalShuffleBlockHandler**: Handles shuffle block requests (from spark-network-shuffle dependency)
55
- **SASL Authentication**: Optional security layer for multi-tenant clusters
56
57
## Capabilities
58
59
### YARN Auxiliary Service Integration
60
61
The primary interface for YARN integration through the auxiliary service framework:
62
63
```java
64
public class YarnShuffleService extends AuxiliaryService { .api }
65
```
66
67
**Constructor:**
68
```java
69
public YarnShuffleService() { .api }
70
```
71
Creates a new shuffle service instance with service name "spark_shuffle".
72
73
**Application Lifecycle Management:**
74
```java
75
public void initializeApplication(ApplicationInitializationContext context) { .api }
76
public void stopApplication(ApplicationTerminationContext context) { .api }
77
```
78
79
**Container Lifecycle Management:**
80
```java
81
public void initializeContainer(ContainerInitializationContext context) { .api }
82
public void stopContainer(ContainerTerminationContext context) { .api }
83
```
84
85
**Service Lifecycle:**
86
```java
87
protected void serviceInit(Configuration conf) { .api }
88
protected void serviceStop() { .api }
89
public ByteBuffer getMetaData() { .api }
90
```
91
92
### Configuration Integration
93
94
Provides integration between Hadoop Configuration and Spark's network layer:
95
96
```java
97
public class HadoopConfigProvider extends ConfigProvider { .api }
98
```
99
100
**Constructor:**
101
```java
102
public HadoopConfigProvider(Configuration conf) { .api }
103
```
104
Creates a configuration provider that uses Hadoop Configuration as the backing store.
105
106
**Configuration Access:**
107
```java
108
public String get(String name) throws NoSuchElementException { .api }
109
```
110
Retrieves configuration values by name, throwing NoSuchElementException if the key is not found.
111
112
### Service Configuration
113
114
**Key Configuration Properties:**
115
- `spark.shuffle.service.port` (default: 7337): Port for the shuffle service
116
- `spark.authenticate` (default: false): Enable SASL authentication
117
- `yarn.nodemanager.local-dirs`: Local directories for executor state persistence
118
119
**Example Configuration:**
120
```java
121
Configuration conf = new Configuration();
122
conf.setInt("spark.shuffle.service.port", 7337);
123
conf.setBoolean("spark.authenticate", true);
124
125
// For multi-directory setups
126
conf.set("yarn.nodemanager.local-dirs", "/tmp/yarn-local-1,/tmp/yarn-local-2");
127
```
128
129
### Authentication and Security
130
131
When authentication is enabled, the service integrates with Spark's SASL authentication:
132
133
```java
134
// Authentication is configured via YARN configuration
135
conf.setBoolean("spark.authenticate", true);
136
137
// Applications must also set spark.authenticate independently
138
// and provide shuffle secrets during application initialization
139
```
140
141
### Executor State Persistence
142
143
The service automatically persists executor registration information to survive NodeManager restarts:
144
145
```java
146
// State is persisted to registeredExecutors.ldb in local directories
147
// Recovery happens automatically during service initialization
148
// No direct API for state management - handled internally
149
```
150
151
### Testing and Debugging
152
153
**Testing Support:**
154
```java
155
// Static fields available for testing (marked @VisibleForTesting)
156
public static int boundPort { .api } // Actual bound port
157
public static YarnShuffleService instance { .api } // Service instance
158
159
// Package-visible testing access
160
ExternalShuffleBlockHandler blockHandler { .api } // Block handler instance
161
File registeredExecutorFile { .api } // State persistence file
162
```
163
164
## Error Handling
165
166
The service handles various error conditions gracefully:
167
168
- **Initialization Errors**: Service continues startup even if shuffle handler initialization fails
169
- **Application Errors**: Application lifecycle errors are logged but don't affect other applications
170
- **State Corruption**: Corrupt executor state files are detected and recovered automatically
171
- **Network Errors**: Handled by the underlying Spark transport layer
172
173
## Deployment Integration
174
175
### YARN Configuration
176
Add to `yarn-site.xml`:
177
```xml
178
<configuration>
179
<property>
180
<name>yarn.nodemanager.aux-services</name>
181
<value>spark_shuffle</value>
182
</property>
183
<property>
184
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
185
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
186
</property>
187
<property>
188
<name>spark.shuffle.service.port</name>
189
<value>7337</value>
190
</property>
191
</configuration>
192
```
193
194
### Spark Application Configuration
195
```java
196
// In Spark application configuration
197
sparkConf.set("spark.shuffle.service.enabled", "true");
198
// Port is automatically discovered from YARN configuration
199
```
200
201
## Compatibility
202
- **Spark Version**: 1.6.3
203
- **Hadoop/YARN**: Compatible with Hadoop 2.x YARN clusters
204
- **Java**: Requires Java 8 or later
205
- **Scala**: Binary compatible with Scala 2.10 and 2.11