External shuffle service for Apache Spark that enables shuffle operations outside of executor processes
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle@2.2.00
# Spark Network Shuffle
1
2
Spark Network Shuffle provides an external shuffle service for Apache Spark that enables shuffle operations to be performed outside of executor processes. This improves resource utilization and fault tolerance by allowing executors to store and retrieve shuffle data through a dedicated service.
3
4
## Package Information
5
6
- **Package Name**: spark-network-shuffle_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add Maven dependency: `org.apache.spark:spark-network-shuffle_2.11:2.2.3`
10
11
## Core Imports
12
13
```java
14
import org.apache.spark.network.shuffle.ExternalShuffleClient;
15
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
16
import org.apache.spark.network.shuffle.BlockFetchingListener;
17
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
18
import org.apache.spark.network.sasl.ShuffleSecretManager;
19
```
20
21
## Basic Usage
22
23
```java
24
import org.apache.spark.network.shuffle.ExternalShuffleClient;
25
import org.apache.spark.network.shuffle.BlockFetchingListener;
26
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
27
import org.apache.spark.network.util.TransportConf;
28
import org.apache.spark.network.buffer.ManagedBuffer;
29
30
// Create and initialize client
31
TransportConf conf = new TransportConf("shuffle");
32
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
33
client.init("myApp");
34
35
// Register executor with shuffle service
36
String[] localDirs = {"/tmp/spark-shuffle"};
37
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
38
localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");
39
client.registerWithShuffleServer("localhost", 7337, "executor-1", info);
40
41
// Fetch blocks asynchronously
42
String[] blockIds = {"shuffle_0_1_0", "shuffle_0_2_0"};
43
BlockFetchingListener listener = new BlockFetchingListener() {
44
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
45
System.out.println("Fetched block: " + blockId);
46
// Process block data
47
}
48
49
public void onBlockFetchFailure(String blockId, Throwable exception) {
50
System.err.println("Failed to fetch block: " + blockId);
51
}
52
};
53
54
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);
55
client.close();
56
```
57
58
## Architecture
59
60
Spark Network Shuffle is built around several key components:
61
62
- **Shuffle Client**: Client-side interface for fetching shuffle blocks from external services
63
- **Block Handler**: Server-side RPC handler that processes shuffle requests and manages block access
64
- **Block Resolver**: Manages executor metadata and resolves shuffle block locations on disk
65
- **Protocol Messages**: Structured messages for client-server communication using Netty encoding
66
- **SASL Security**: Authentication system for secure shuffle operations
67
- **Mesos Integration**: Specialized components for Mesos cluster manager integration
68
69
## Capabilities
70
71
### Shuffle Client Operations
72
73
Core client functionality for connecting to external shuffle services and fetching shuffle blocks with retry logic and authentication support.
74
75
```java { .api }
76
public abstract class ShuffleClient implements Closeable {
77
public void init(String appId);
78
public abstract void fetchBlocks(
79
String host, int port, String execId, String[] blockIds,
80
BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);
81
}
82
83
public class ExternalShuffleClient extends ShuffleClient {
84
public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled);
85
public void registerWithShuffleServer(String host, int port, String execId, ExecutorShuffleInfo executorInfo)
86
throws IOException, InterruptedException;
87
}
88
```
89
90
[Shuffle Client](./client.md)
91
92
### Shuffle Service Handler
93
94
Server-side components for handling shuffle requests, managing registered executors, and streaming shuffle blocks to clients.
95
96
```java { .api }
97
public class ExternalShuffleBlockHandler extends RpcHandler {
98
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
99
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
100
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
101
public MetricSet getAllMetrics();
102
}
103
```
104
105
[Service Handler](./handler.md)
106
107
### Block Resolution
108
109
Block resolver functionality for managing executor metadata, locating shuffle files on disk, and providing shuffle block data access.
110
111
```java { .api }
112
public class ExternalShuffleBlockResolver {
113
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;
114
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
115
public ManagedBuffer getBlockData(String appId, String execId, String blockId);
116
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
117
}
118
```
119
120
[Block Resolution](./resolver.md)
121
122
### Protocol Messages
123
124
Structured protocol messages for client-server communication, including executor registration, block requests, and streaming handles.
125
126
```java { .api }
127
public abstract class BlockTransferMessage implements Encodable {
128
protected abstract Type type();
129
public ByteBuffer toByteBuffer();
130
131
public static class Decoder {
132
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
133
}
134
}
135
136
public class ExecutorShuffleInfo implements Encodable {
137
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
138
public final String[] localDirs;
139
public final int subDirsPerLocalDir;
140
public final String shuffleManager;
141
}
142
```
143
144
[Protocol Messages](./protocol.md)
145
146
### Security and Authentication
147
148
SASL-based authentication system for secure shuffle operations, managing application secrets and authenticating client connections.
149
150
```java { .api }
151
public class ShuffleSecretManager implements SecretKeyHolder {
152
public ShuffleSecretManager();
153
public void registerApp(String appId, String shuffleSecret);
154
public void unregisterApp(String appId);
155
public String getSecretKey(String appId);
156
}
157
```
158
159
[Security](./security.md)
160
161
### Mesos Integration
162
163
Specialized components for Mesos cluster manager integration, including driver registration and heartbeat mechanisms.
164
165
```java { .api }
166
public class MesosExternalShuffleClient extends ExternalShuffleClient {
167
public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
168
boolean authEnabled);
169
public void registerDriverWithShuffleService(String host, int port,
170
long heartbeatTimeoutMs, long heartbeatIntervalMs) throws IOException, InterruptedException;
171
}
172
```
173
174
[Mesos Integration](./mesos.md)
175
176
## Types
177
178
### Core Types
179
180
```java { .api }
181
public interface BlockFetchingListener extends EventListener {
182
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
183
void onBlockFetchFailure(String blockId, Throwable exception);
184
}
185
186
public interface TempShuffleFileManager {
187
File createTempShuffleFile();
188
boolean registerTempShuffleFileToClean(File file);
189
}
190
191
public class ShuffleIndexRecord {
192
public ShuffleIndexRecord(long offset, long length);
193
public long getOffset();
194
public long getLength();
195
}
196
```
197
198
### Exception Types
199
200
```java { .api }
201
// Common exceptions thrown by shuffle operations
202
java.io.IOException // File I/O operations, network connectivity
203
java.lang.InterruptedException // Blocking operations that can be interrupted
204
java.lang.SecurityException // Authentication failures
205
java.lang.IllegalArgumentException // Invalid block IDs or parameters
206
java.lang.UnsupportedOperationException // Unsupported message types
207
java.lang.RuntimeException // Executor not registered or runtime errors
208
```