YARN Shuffle Service for Apache Spark that provides external shuffle service functionality running as a long-running auxiliary service in the NodeManager process
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn-2-12@3.0.00
# Spark Network YARN
1
2
Spark Network YARN provides the YARN Shuffle Service functionality for Apache Spark, enabling external shuffle management in YARN-managed clusters. This service runs as a long-running auxiliary service within the NodeManager process and allows Spark applications to offload shuffle data storage and retrieval operations, improving application performance and reliability by maintaining shuffle data even when executors fail or are deallocated.
3
4
## Package Information
5
6
- **Package Name**: spark-network-yarn_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Maven Coordinates**: `org.apache.spark:spark-network-yarn_2.12:3.0.1`
10
- **Installation**: Include as dependency in Maven/Gradle project
11
12
## Core Imports
13
14
```java
15
import org.apache.spark.network.yarn.YarnShuffleService;
16
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
17
import org.apache.hadoop.conf.Configuration;
18
```
19
20
## Basic Usage
21
22
```java
23
// Configure and start YARN Shuffle Service (typically done by YARN NodeManager)
24
YarnShuffleService shuffleService = new YarnShuffleService();
25
26
// Initialize with Hadoop configuration
27
Configuration conf = new Configuration();
28
conf.setBoolean("spark.authenticate", false);
29
conf.setInt("spark.shuffle.service.port", 7337);
30
31
shuffleService.serviceInit(conf);
32
33
// The service will automatically handle application lifecycle
34
// through YARN callbacks: initializeApplication, stopApplication, etc.
35
```
36
37
## Architecture
38
39
The Spark Network YARN module consists of three main components:
40
41
- **YarnShuffleService**: Main service implementation that extends Hadoop's AuxiliaryService, managing shuffle server lifecycle and application registration
42
- **YarnShuffleServiceMetrics**: Metrics forwarding system that integrates shuffle service metrics with Hadoop's metrics framework
43
- **HadoopConfigProvider**: Configuration adapter that bridges Hadoop Configuration to Spark's network configuration system
44
45
The service operates as an auxiliary service within YARN's NodeManager process, automatically starting and stopping with the NodeManager and handling Spark application lifecycle events.
46
47
## Capabilities
48
49
### YARN Shuffle Service
50
51
Core external shuffle service implementation for YARN clusters.
52
53
```java { .api }
54
public class YarnShuffleService extends AuxiliaryService {
55
// Constructors
56
public YarnShuffleService();
57
58
// Lifecycle methods
59
public void initializeApplication(ApplicationInitializationContext context);
60
public void stopApplication(ApplicationTerminationContext context);
61
public void initializeContainer(ContainerInitializationContext context);
62
public void stopContainer(ContainerTerminationContext context);
63
public ByteBuffer getMetaData();
64
public void setRecoveryPath(Path recoveryPath);
65
66
// Protected service lifecycle methods
67
protected void serviceInit(Configuration conf) throws Exception;
68
protected void serviceStop();
69
protected Path getRecoveryPath(String fileName);
70
protected File initRecoveryDb(String dbName);
71
}
72
```
73
74
**Key Configuration Properties:**
75
- `spark.shuffle.service.port` (default: 7337) - Port for shuffle server
76
- `spark.authenticate` (default: false) - Enable authentication
77
- `spark.yarn.shuffle.stopOnFailure` (default: false) - Stop NodeManager on service failure
78
79
### Application ID Encoding
80
81
Utility class for encoding application IDs in shuffle service context.
82
83
```java { .api }
84
public static class YarnShuffleService.AppId {
85
public final String appId;
86
87
// Constructors
88
public AppId(String appId);
89
90
// Standard object methods
91
public boolean equals(Object o);
92
public int hashCode();
93
public String toString();
94
}
95
```
96
97
### Metrics Integration
98
99
Forwards shuffle service metrics to Hadoop's metrics system for monitoring and observability.
100
101
```java { .api }
102
class YarnShuffleServiceMetrics implements MetricsSource {
103
// Constructor
104
YarnShuffleServiceMetrics(MetricSet metricSet);
105
106
// MetricsSource implementation
107
public void getMetrics(MetricsCollector collector, boolean all);
108
109
// Static utility methods
110
public static void collectMetric(
111
MetricsRecordBuilder metricsRecordBuilder,
112
String name,
113
Metric metric);
114
}
115
```
116
117
### Configuration Provider
118
119
Hadoop Configuration adapter for Spark network configuration system.
120
121
```java { .api }
122
public class HadoopConfigProvider extends ConfigProvider {
123
// Constructor
124
public HadoopConfigProvider(Configuration conf);
125
126
// ConfigProvider implementation
127
public String get(String name);
128
public String get(String name, String defaultValue);
129
public Iterable<Map.Entry<String, String>> getAll();
130
}
131
```
132
133
## Types
134
135
```java { .api }
136
// From Hadoop YARN APIs
137
interface ApplicationInitializationContext {
138
ApplicationId getApplicationId();
139
ByteBuffer getApplicationDataForService();
140
}
141
142
interface ApplicationTerminationContext {
143
ApplicationId getApplicationId();
144
}
145
146
interface ContainerInitializationContext {
147
ContainerId getContainerId();
148
}
149
150
interface ContainerTerminationContext {
151
ContainerId getContainerId();
152
}
153
154
// From Hadoop Configuration
155
class Configuration implements Iterable<Map.Entry<String, String>> {
156
boolean getBoolean(String name, boolean defaultValue);
157
int getInt(String name, int defaultValue);
158
String get(String name);
159
String[] getTrimmedStrings(String name);
160
}
161
162
// From Hadoop Metrics
163
interface MetricsSource {
164
void getMetrics(MetricsCollector collector, boolean all);
165
}
166
167
interface MetricsCollector {
168
MetricsRecordBuilder addRecord(String name);
169
}
170
171
interface MetricsRecordBuilder {
172
MetricsRecordBuilder addCounter(MetricsInfo info, long value);
173
MetricsRecordBuilder addGauge(MetricsInfo info, Number value);
174
}
175
176
// From Codahale Metrics
177
interface MetricSet {
178
Map<String, Metric> getMetrics();
179
}
180
181
interface Metric {
182
// Base interface for all metrics
183
}
184
185
interface Timer extends Metric {
186
long getCount();
187
double getFifteenMinuteRate();
188
double getFiveMinuteRate();
189
double getOneMinuteRate();
190
double getMeanRate();
191
}
192
193
interface Meter extends Metric {
194
long getCount();
195
double getFifteenMinuteRate();
196
double getFiveMinuteRate();
197
double getOneMinuteRate();
198
double getMeanRate();
199
}
200
201
interface Counter extends Metric {
202
long getCount();
203
}
204
205
interface Gauge<T> extends Metric {
206
T getValue();
207
}
208
```
209
210
## Error Handling
211
212
The service includes robust error handling for common scenarios:
213
214
- **Configuration errors**: Missing or invalid configuration values are handled with appropriate defaults
215
- **Authentication failures**: When authentication is enabled, unauthorized requests are rejected
216
- **Recovery failures**: Database corruption or missing recovery files are handled gracefully
217
- **Network errors**: Port binding failures and network issues are logged and can optionally stop the NodeManager
218
- **Application lifecycle errors**: Errors in application initialization/termination are logged but don't stop the service
219
220
Common exceptions:
221
- `NoSuchElementException` - Thrown by HadoopConfigProvider when required configuration key is missing
222
- `IOException` - Various I/O operations during service initialization, recovery, and database operations
223
- `Exception` - General service lifecycle exceptions that are caught and logged
224
225
## Integration Notes
226
227
- **YARN Integration**: Automatically registered and managed by YARN NodeManager as an auxiliary service
228
- **Spark Integration**: Spark applications connect by setting `spark.shuffle.service.enabled=true`
229
- **Authentication**: Optional SASL authentication using shared secrets prevents cross-application data access
230
- **Metrics Integration**: Automatically registers with Hadoop's DefaultMetricsSystem for JMX export
231
- **Recovery Support**: Supports NodeManager recovery by persisting application state and secrets to LevelDB
232
- **Shaded Dependencies**: Uses shaded Netty and Jackson dependencies to avoid classpath conflicts with YARN