External shuffle service client for Apache Spark that enables reading shuffle blocks from external servers instead of executors
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-10@1.6.00
# Apache Spark Network Shuffle
1
2
Apache Spark Network Shuffle provides external shuffle service functionality that enables reading shuffle blocks from external servers instead of directly from executors. This improves fault tolerance by allowing shuffle data to persist even when executors are lost, making Spark applications more reliable in distributed computing environments.
3
4
## Package Information
5
6
- **Package Name**: spark-network-shuffle_2.10
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Group ID**: org.apache.spark
10
- **Installation**: Add dependency to your Maven pom.xml:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-network-shuffle_2.10</artifactId>
16
<version>1.6.3</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.spark.network.shuffle.ExternalShuffleClient;
24
import org.apache.spark.network.shuffle.BlockFetchingListener;
25
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
26
import org.apache.spark.network.sasl.ShuffleSecretManager;
27
import org.apache.spark.network.util.TransportConf;
28
import org.apache.spark.network.util.ConfigProvider;
29
```
30
31
For server-side components:
32
33
```java
34
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
35
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
36
```
37
38
## Basic Usage
39
40
```java
41
import org.apache.spark.network.shuffle.ExternalShuffleClient;
42
import org.apache.spark.network.shuffle.BlockFetchingListener;
43
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
44
import org.apache.spark.network.buffer.ManagedBuffer;
45
import org.apache.spark.network.util.TransportConf;
46
import org.apache.spark.network.util.ConfigProvider;
47
48
// Create simple configuration provider with defaults
49
ConfigProvider configProvider = new ConfigProvider() {
50
@Override
51
public String get(String name) {
52
// Return default values for shuffle configuration
53
if (name.equals("spark.shuffle.io.maxRetries")) return "3";
54
if (name.equals("spark.shuffle.io.retryWait")) return "5s";
55
throw new java.util.NoSuchElementException(name);
56
}
57
};
58
59
// Create transport configuration
60
TransportConf conf = new TransportConf("shuffle", configProvider);
61
62
// Create client with SASL disabled for simplicity
63
ExternalShuffleClient client = new ExternalShuffleClient(
64
conf,
65
null, // secretKeyHolder - null for no SASL
66
false, // saslEnabled
67
false // saslEncryptionEnabled
68
);
69
70
// Initialize client
71
client.init("my-spark-app");
72
73
// Register executor with shuffle server
74
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
75
new String[]{"/tmp/spark-shuffle"}, // local directories
76
64, // subdirs per local dir
77
"org.apache.spark.shuffle.sort.SortShuffleManager"
78
);
79
80
client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);
81
82
// Implement callback for block fetching
83
BlockFetchingListener listener = new BlockFetchingListener() {
84
@Override
85
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
86
System.out.println("Successfully fetched block: " + blockId);
87
// Process the data...
88
}
89
90
@Override
91
public void onBlockFetchFailure(String blockId, Throwable exception) {
92
System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());
93
}
94
};
95
96
// Fetch shuffle blocks
97
String[] blockIds = {"shuffle_1_2_0", "shuffle_1_2_1"};
98
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener);
99
100
// Close client when done
101
client.close();
102
```
103
104
## Architecture
105
106
The Apache Spark Network Shuffle library is built around several key components:
107
108
- **Client Components**: `ExternalShuffleClient` and `MesosExternalShuffleClient` provide the primary interface for fetching shuffle data from external services
109
- **Server Components**: `ExternalShuffleBlockHandler` and `ExternalShuffleBlockResolver` implement the server-side logic for serving shuffle blocks
110
- **Protocol Layer**: Network protocol messages (`BlockTransferMessage` subclasses) handle communication between clients and servers
111
- **Security Layer**: `ShuffleSecretManager` provides SASL-based authentication for secure shuffle data access
112
- **Retry Mechanisms**: `RetryingBlockFetcher` and `OneForOneBlockFetcher` implement fault-tolerant block fetching with configurable retry logic
113
114
## Capabilities
115
116
### Shuffle Client Operations
117
118
Primary client interface for fetching shuffle blocks from external shuffle services. Supports both basic and Mesos-specific deployments with configurable SASL authentication.
119
120
```java { .api }
121
public abstract class ShuffleClient implements Closeable {
122
public void init(String appId);
123
public abstract void fetchBlocks(
124
String host,
125
int port,
126
String execId,
127
String[] blockIds,
128
BlockFetchingListener listener
129
);
130
}
131
132
public class ExternalShuffleClient extends ShuffleClient {
133
public ExternalShuffleClient(
134
TransportConf conf,
135
SecretKeyHolder secretKeyHolder,
136
boolean saslEnabled,
137
boolean saslEncryptionEnabled
138
);
139
140
public void registerWithShuffleServer(
141
String host,
142
int port,
143
String execId,
144
ExecutorShuffleInfo executorInfo
145
) throws IOException;
146
}
147
```
148
149
[Shuffle Client](./shuffle-client.md)
150
151
### Server-Side Block Management
152
153
Server-side components that handle shuffle block requests, manage executor registrations, and resolve block locations on the filesystem.
154
155
```java { .api }
156
public class ExternalShuffleBlockHandler extends RpcHandler {
157
public ExternalShuffleBlockHandler(
158
TransportConf conf,
159
File registeredExecutorFile
160
) throws IOException;
161
162
public void receive(
163
TransportClient client,
164
ByteBuffer message,
165
RpcResponseCallback callback
166
);
167
}
168
169
public class ExternalShuffleBlockResolver {
170
public ExternalShuffleBlockResolver(
171
TransportConf conf,
172
File registeredExecutorFile
173
) throws IOException;
174
175
public void registerExecutor(
176
String appId,
177
String execId,
178
ExecutorShuffleInfo executorInfo
179
);
180
181
public ManagedBuffer getBlockData(
182
String appId,
183
String execId,
184
String blockId
185
) throws IOException;
186
}
187
```
188
189
[Server Components](./server-components.md)
190
191
### Network Protocol Messages
192
193
Protocol message classes for communication between shuffle clients and servers, including executor registration, block requests, and data transfer.
194
195
```java { .api }
196
public abstract class BlockTransferMessage implements Encodable {
197
public ByteBuffer toByteBuffer();
198
199
public static class Decoder {
200
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
201
}
202
}
203
204
public class ExecutorShuffleInfo implements Encodable {
205
public ExecutorShuffleInfo(
206
String[] localDirs,
207
int subDirsPerLocalDir,
208
String shuffleManager
209
);
210
}
211
```
212
213
[Protocol Messages](./protocol-messages.md)
214
215
### Security and Authentication
216
217
SASL-based security mechanisms for authenticating shuffle clients with external shuffle services, including secret management and secure communication.
218
219
```java { .api }
220
public class ShuffleSecretManager implements SecretKeyHolder {
221
public ShuffleSecretManager();
222
223
public void registerApp(String appId, String shuffleSecret);
224
public void unregisterApp(String appId);
225
public String getSecretKey(String appId);
226
}
227
```
228
229
[Security](./security.md)
230
231
## Types
232
233
```java { .api }
234
public interface BlockFetchingListener extends EventListener {
235
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
236
void onBlockFetchFailure(String blockId, Throwable exception);
237
}
238
239
public enum BlockTransferMessage.Type {
240
OPEN_BLOCKS(0),
241
UPLOAD_BLOCK(1),
242
REGISTER_EXECUTOR(2),
243
STREAM_HANDLE(3),
244
REGISTER_DRIVER(4);
245
}
246
```