0
# Apache Spark Network Shuffle
1
2
Apache Spark Network Shuffle is a Java library that provides network-based shuffle functionality for Apache Spark's distributed computing framework. It enables efficient data exchange between Spark executors during shuffle operations through external shuffle services, supporting secure authentication, retry mechanisms, and comprehensive metrics collection.
3
4
## Package Information
5
6
- **Package Name**: spark-network-shuffle_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Maven dependency `org.apache.spark:spark-network-shuffle_2.11:2.4.8`
10
11
## Core Imports
12
13
```java
14
import org.apache.spark.network.shuffle.ShuffleClient;
15
import org.apache.spark.network.shuffle.ExternalShuffleClient;
16
import org.apache.spark.network.shuffle.BlockFetchingListener;
17
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
18
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
19
import org.apache.spark.network.sasl.ShuffleSecretManager;
20
```
21
22
## Basic Usage
23
24
```java
25
import org.apache.spark.network.shuffle.ExternalShuffleClient;
26
import org.apache.spark.network.shuffle.BlockFetchingListener;
27
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
28
import org.apache.spark.network.sasl.ShuffleSecretManager;
29
import org.apache.spark.network.util.TransportConf;
30
import org.apache.spark.network.buffer.ManagedBuffer;
31
32
// Create shuffle secret manager for authentication
33
ShuffleSecretManager secretManager = new ShuffleSecretManager();
34
secretManager.registerApp("myApp", "mySecretKey");
35
36
// Create external shuffle client
37
TransportConf conf = new TransportConf("shuffle");
38
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 5000);
39
40
// Initialize client
41
client.init("myApp");
42
43
// Register executor with shuffle service
44
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
45
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
46
client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);
47
48
// Fetch blocks with listener
49
BlockFetchingListener listener = new BlockFetchingListener() {
50
@Override
51
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
52
System.out.println("Successfully fetched block: " + blockId);
53
}
54
55
@Override
56
public void onBlockFetchFailure(String blockId, Throwable exception) {
57
System.err.println("Failed to fetch block: " + blockId);
58
}
59
};
60
61
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
62
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);
63
64
// Close client when done
65
client.close();
66
```
67
68
## Architecture
69
70
Apache Spark Network Shuffle is built around several key components:
71
72
- **Shuffle Clients**: Client-side components for fetching shuffle blocks from external services
73
- **Shuffle Handlers**: Server-side RPC handlers that serve shuffle blocks to clients
74
- **Block Resolution**: Components that convert shuffle block IDs to physical file segments
75
- **Security Management**: SASL-based authentication system for secure shuffle operations
76
- **Protocol Messages**: Serializable message classes for client-server communication
77
- **File Management**: Temporary file handling during block transfer operations
78
- **Retry Mechanisms**: Automatic retry functionality for handling transient failures
79
80
## Capabilities
81
82
### Shuffle Client Operations
83
84
Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.
85
86
```java { .api }
87
public abstract class ShuffleClient implements Closeable {
88
public void init(String appId);
89
public abstract void fetchBlocks(
90
String host, int port, String execId, String[] blockIds,
91
BlockFetchingListener listener, DownloadFileManager downloadFileManager
92
);
93
public MetricSet shuffleMetrics();
94
public void close();
95
}
96
97
public class ExternalShuffleClient extends ShuffleClient {
98
public ExternalShuffleClient(
99
TransportConf conf, SecretKeyHolder secretKeyHolder,
100
boolean authEnabled, long registrationTimeoutMs
101
);
102
public void registerWithShuffleServer(
103
String host, int port, String execId, ExecutorShuffleInfo executorInfo
104
) throws IOException, InterruptedException;
105
}
106
```
107
108
[Shuffle Client Operations](./shuffle-client.md)
109
110
### Shuffle Server Components
111
112
Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.
113
114
```java { .api }
115
public class ExternalShuffleBlockHandler extends RpcHandler {
116
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);
117
public void receive(
118
TransportClient client, ByteBuffer message, RpcResponseCallback callback
119
);
120
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
121
public void executorRemoved(String executorId, String appId);
122
}
123
124
public class ExternalShuffleBlockResolver {
125
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);
126
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
127
public ManagedBuffer getBlockData(
128
String appId, String execId, int shuffleId, int mapId, int reduceId
129
);
130
}
131
```
132
133
[Shuffle Server Components](./shuffle-server.md)
134
135
### Authentication and Security
136
137
SASL-based authentication system for securing shuffle operations between clients and external shuffle services.
138
139
```java { .api }
140
public class ShuffleSecretManager implements SecretKeyHolder {
141
private static final String SPARK_SASL_USER = "sparkSaslUser";
142
143
public ShuffleSecretManager();
144
public void registerApp(String appId, String shuffleSecret);
145
public void registerApp(String appId, ByteBuffer shuffleSecret);
146
public void unregisterApp(String appId);
147
public String getSaslUser(String appId);
148
public String getSecretKey(String appId);
149
}
150
```
151
152
[Authentication and Security](./authentication.md)
153
154
### Protocol Messages
155
156
Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.
157
158
```java { .api }
159
public abstract class BlockTransferMessage implements Encodable {
160
public ByteBuffer toByteBuffer();
161
162
public enum Type {
163
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
164
REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
165
}
166
167
public static class Decoder {
168
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
169
}
170
}
171
172
public class ExecutorShuffleInfo implements Encodable {
173
public final String[] localDirs;
174
public final int subDirsPerLocalDir;
175
public final String shuffleManager;
176
177
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
178
}
179
```
180
181
[Protocol Messages](./protocol.md)
182
183
### Block Fetching and Retry Logic
184
185
Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.
186
187
```java { .api }
188
public interface BlockFetchingListener extends EventListener {
189
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
190
void onBlockFetchFailure(String blockId, Throwable exception);
191
}
192
193
public class OneForOneBlockFetcher {
194
public OneForOneBlockFetcher(
195
TransportClient client, String appId, String execId, String[] blockIds,
196
BlockFetchingListener listener, TransportConf transportConf
197
);
198
public void start();
199
}
200
201
public class RetryingBlockFetcher {
202
public RetryingBlockFetcher(
203
TransportConf conf, BlockFetchStarter fetchStarter,
204
String[] blockIds, BlockFetchingListener listener
205
);
206
public void start();
207
208
public interface BlockFetchStarter {
209
void createAndStart(String[] blockIds, BlockFetchingListener listener);
210
}
211
}
212
```
213
214
[Block Fetching and Retry Logic](./block-fetching.md)
215
216
### File Management
217
218
Temporary file management system for handling downloaded blocks during transfer operations.
219
220
```java { .api }
221
public interface DownloadFile {
222
boolean delete();
223
DownloadFileWritableChannel openForWriting() throws IOException;
224
String path();
225
}
226
227
public interface DownloadFileManager {
228
DownloadFile createTempFile(TransportConf transportConf);
229
boolean registerTempFileToClean(DownloadFile file);
230
}
231
232
public interface DownloadFileWritableChannel extends WritableByteChannel {
233
ManagedBuffer closeAndRead();
234
}
235
```
236
237
[File Management](./file-management.md)
238
239
### Mesos Integration
240
241
Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.
242
243
```java { .api }
244
public class MesosExternalShuffleClient extends ExternalShuffleClient {
245
public MesosExternalShuffleClient(
246
TransportConf conf, SecretKeyHolder secretKeyHolder,
247
boolean authEnabled, long registrationTimeoutMs
248
);
249
public void registerDriverWithShuffleService(
250
String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs
251
) throws IOException, InterruptedException;
252
}
253
```
254
255
[Mesos Integration](./mesos.md)
256
257
## Types
258
259
```java { .api }
260
public class ShuffleIndexRecord {
261
public ShuffleIndexRecord(long offset, long length);
262
public long getOffset();
263
public long getLength();
264
}
265
266
public class ShuffleIndexInformation {
267
public ShuffleIndexInformation(File indexFile);
268
public int getSize();
269
public ShuffleIndexRecord getIndex(int reduceId);
270
}
271
272
public class SimpleDownloadFile implements DownloadFile {
273
public SimpleDownloadFile(File file, TransportConf transportConf);
274
public boolean delete();
275
public DownloadFileWritableChannel openForWriting();
276
public String path();
277
}
278
```