0
# YARN Shuffle Service
1
2
External shuffle service for YARN NodeManagers that provides shuffle data management and retrieval for Spark applications. This auxiliary service runs as part of YARN NodeManager processes and improves executor stability by externalizing shuffle data management.
3
4
## Capabilities
5
6
### YarnShuffleService
7
8
Main YARN auxiliary service class that integrates Spark's external shuffle service with YARN NodeManager lifecycle.
9
10
```java { .api }
11
public class YarnShuffleService extends AuxiliaryService {
12
// Service lifecycle methods
13
protected void serviceInit(Configuration conf) throws Exception;
14
protected void serviceStart() throws Exception;
15
protected void serviceStop() throws Exception;
16
17
// Application lifecycle callbacks
18
public void initializeApplication(ApplicationInitializationContext context) throws Exception;
19
public void stopApplication(ApplicationTerminationContext context) throws Exception;
20
21
// Container lifecycle callbacks
22
public void initializeContainer(ContainerInitializationContext context) throws Exception;
23
public void stopContainer(ContainerTerminationContext context) throws Exception;
24
25
// Recovery and metadata
26
public void setRecoveryPath(Path recoveryPath);
27
public ByteBuffer getMetaData();
28
}
29
```
30
31
**Service Lifecycle Methods:**
32
33
**`serviceInit(Configuration conf): void`**
34
- Initializes the shuffle service with YARN and Hadoop configuration
35
- Sets up network transport server and storage directories
36
- Configures authentication and security settings
37
- Called by YARN NodeManager during service initialization
38
39
**`serviceStart(): void`**
40
- Starts the shuffle service network server
41
- Begins accepting shuffle data requests from executors
42
- Initializes recovery mechanisms if enabled
43
- Called after serviceInit during NodeManager startup
44
45
**`serviceStop(): void`**
46
- Stops the shuffle service network server
47
- Cleans up temporary resources and connections
48
- Saves recovery state if persistence is enabled
49
- Called during NodeManager shutdown
50
51
**Application Lifecycle:**
52
53
**`initializeApplication(ApplicationInitializationContext context): void`**
54
- Called when a new Spark application starts on the node
55
- Creates application-specific directories and metadata
56
- Sets up security context for the application
57
- Initializes shuffle storage for the application
58
59
**`stopApplication(ApplicationTerminationContext context): void`**
60
- Called when a Spark application completes or terminates
61
- Cleans up application-specific shuffle data and directories
62
- Releases resources allocated to the application
63
- Removes security credentials for the application
64
65
**Container Lifecycle:**
66
67
**`initializeContainer(ContainerInitializationContext context): void`**
68
- Called when an executor container starts on the node
69
- Registers the executor with the shuffle service
70
- Sets up container-specific shuffle data structures
71
- Configures executor authentication credentials
72
73
**`stopContainer(ContainerTerminationContext context): void`**
74
- Called when an executor container terminates
75
- Cleans up container-specific shuffle data
76
- Releases executor-specific resources
77
- Updates application metadata
78
79
### Recovery and Persistence
80
81
**`setRecoveryPath(Path recoveryPath): void`**
82
- Sets the path for persisting shuffle service recovery data
83
- Enables recovery of shuffle data across NodeManager restarts
84
- Used in conjunction with YARN NodeManager recovery features
85
86
**`getMetaData(): ByteBuffer`**
87
- Returns serialized metadata about the shuffle service
88
- Used by YARN for service discovery and health monitoring
89
- Contains version information and configuration details
90
91
## Configuration
92
93
### YARN NodeManager Configuration
94
95
```xml
96
<!-- yarn-site.xml configuration -->
97
<property>
98
<name>yarn.nodemanager.aux-services</name>
99
<value>spark_shuffle</value>
100
</property>
101
102
<property>
103
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
104
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
105
</property>
106
107
<property>
108
<name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name>
109
<value>/path/to/spark-*-yarn-shuffle.jar</value>
110
</property>
111
```
112
113
### Spark Application Configuration
114
115
```scala
116
val sparkConf = new SparkConf()
117
.set("spark.shuffle.service.enabled", "true")
118
.set("spark.shuffle.service.port", "7337")
119
.set("spark.dynamicAllocation.enabled", "true")
120
.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
121
```
122
123
**Key Configuration Options:**
124
125
**`spark.shuffle.service.enabled`**
126
- Enables external shuffle service usage in Spark applications
127
- Default: false
128
- Must be true to use YarnShuffleService
129
130
**`spark.shuffle.service.port`**
131
- Port for shuffle service network communication
132
- Default: 7337
133
- Must match NodeManager configuration
134
135
**`spark.dynamicAllocation.enabled`**
136
- Enables dynamic executor allocation
137
- Works best with external shuffle service
138
- Allows safe executor removal without losing shuffle data
139
140
## Integration Patterns
141
142
### Secure Cluster Integration
143
144
```xml
145
<!-- Kerberos authentication -->
146
<property>
147
<name>spark.shuffle.service.auth.enabled</name>
148
<value>true</value>
149
</property>
150
151
<property>
152
<name>spark.shuffle.service.sasl.timeout</name>
153
<value>30000</value>
154
</property>
155
```
156
157
### Recovery Configuration
158
159
```xml
160
<!-- Enable recovery for shuffle data -->
161
<property>
162
<name>spark.shuffle.service.db.enabled</name>
163
<value>true</value>
164
</property>
165
166
<property>
167
<name>spark.shuffle.service.db.backend</name>
168
<value>LEVELDB</value>
169
</property>
170
```
171
172
### Performance Tuning
173
174
```xml
175
<!-- Network and I/O optimization -->
176
<property>
177
<name>spark.shuffle.io.serverThreads</name>
178
<value>8</value>
179
</property>
180
181
<property>
182
<name>spark.shuffle.io.clientThreads</name>
183
<value>8</value>
184
</property>
185
186
<property>
187
<name>spark.shuffle.service.index.cache.size</name>
188
<value>2048m</value>
189
</property>
190
```
191
192
## Deployment
193
194
### Installation Steps
195
196
1. **Copy Shuffle Service JAR**:
197
```bash
198
cp spark-*-yarn-shuffle.jar $YARN_HOME/share/hadoop/yarn/lib/
199
```
200
201
2. **Configure YARN NodeManager**:
202
```xml
203
<!-- Add to yarn-site.xml on all NodeManager nodes -->
204
<property>
205
<name>yarn.nodemanager.aux-services</name>
206
<value>mapreduce_shuffle,spark_shuffle</value>
207
</property>
208
```
209
210
3. **Restart NodeManagers**:
211
```bash
212
$YARN_HOME/sbin/yarn-daemon.sh stop nodemanager
213
$YARN_HOME/sbin/yarn-daemon.sh start nodemanager
214
```
215
216
### Verification
217
218
```bash
219
# Check NodeManager logs for shuffle service initialization
220
grep "YarnShuffleService" $YARN_LOG_DIR/yarn-*-nodemanager-*.log
221
222
# Verify service is registered
223
yarn node -list -all | grep -A5 "Auxiliary Services"
224
```
225
226
## Monitoring and Troubleshooting
227
228
### Common Issues
229
230
**Service Not Starting:**
231
```
232
ERROR: Failed to initialize YarnShuffleService
233
```
234
- Check classpath configuration in yarn-site.xml
235
- Verify JAR file permissions and location
236
- Review NodeManager logs for initialization errors
237
238
**Authentication Failures:**
239
```
240
ERROR: SASL authentication failed for shuffle service
241
```
242
- Verify Kerberos configuration consistency
243
- Check principal and keytab configurations
244
- Ensure clocks are synchronized across cluster
245
246
**Port Conflicts:**
247
```
248
ERROR: Failed to bind shuffle service to port 7337
249
```
250
- Check for port conflicts with other services
251
- Verify firewall rules allow shuffle service port
252
- Consider changing default port if needed
253
254
### Metrics and Monitoring
255
256
The shuffle service exposes metrics through JMX:
257
258
```java
259
// Key metrics available via JMX
260
"spark.shuffle.service:type=ExternalShuffleBlockHandler"
261
- OpenBlockRequestCount
262
- RegisterExecutorRequestCount
263
- RemoveBlocksRequestCount
264
- TotalBlockTransferTime
265
```
266
267
### Log Configuration
268
269
```xml
270
<!-- log4j.properties for shuffle service logging -->
271
log4j.logger.org.apache.spark.network.yarn=INFO
272
log4j.logger.org.apache.spark.network.shuffle=DEBUG
273
log4j.logger.org.apache.spark.network.server=WARN
274
```
275
276
## Error Handling
277
278
### Exception Types
279
280
**`ServiceStateException`**
281
- Thrown during incorrect service lifecycle transitions
282
- Check YARN NodeManager service state
283
284
**`IOException`**
285
- Network or disk I/O errors during shuffle operations
286
- Check disk space and network connectivity
287
288
**`SecurityException`**
289
- Authentication or authorization failures
290
- Verify security configuration and credentials
291
292
### Recovery Procedures
293
294
**Shuffle Data Corruption:**
295
1. Stop affected NodeManager
296
2. Clear shuffle service recovery database
297
3. Restart NodeManager with clean state
298
4. Resubmit applications if necessary
299
300
**Performance Degradation:**
301
1. Monitor shuffle service metrics
302
2. Check disk and network I/O patterns
303
3. Adjust thread pool and cache configurations
304
4. Consider increasing NodeManager memory allocation
305
306
## Best Practices
307
308
### Resource Planning
309
310
- Allocate sufficient memory for shuffle service caches
311
- Consider disk I/O patterns for shuffle data storage
312
- Plan network bandwidth for shuffle traffic
313
314
### Security
315
316
- Enable SASL authentication in secure clusters
317
- Use dedicated service principals for shuffle service
318
- Implement proper access controls for shuffle data
319
320
### Monitoring
321
322
- Set up alerts for shuffle service health metrics
323
- Monitor disk usage in shuffle data directories
324
- Track application shuffle performance metrics