External shuffle service client and server for fault-tolerant data shuffling in Apache Spark
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-11@1.6.00
# Spark Network Shuffle
1
2
Apache Spark Network Shuffle provides external shuffle service functionality that enables fault-tolerant data shuffling in distributed Spark applications. This library allows Spark executors to read shuffle data from external services rather than directly from other executors, improving fault tolerance by preserving shuffle data even when executors are lost.
3
4
## Package Information
5
6
- **Package Name**: org.apache.spark:spark-network-shuffle_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.spark</groupId>
14
<artifactId>spark-network-shuffle_2.11</artifactId>
15
<version>1.6.3</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.spark.network.shuffle.ExternalShuffleClient;
23
import org.apache.spark.network.shuffle.BlockFetchingListener;
24
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
25
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
26
```
27
28
## Basic Usage
29
30
```java
31
import org.apache.spark.network.shuffle.ExternalShuffleClient;
32
import org.apache.spark.network.shuffle.BlockFetchingListener;
33
import org.apache.spark.network.buffer.ManagedBuffer;
34
import org.apache.spark.network.util.TransportConf;
35
36
// Create client configuration
37
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
38
39
// Initialize external shuffle client
40
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
41
client.init("my-app-id");
42
43
// Set up block fetch listener
44
BlockFetchingListener listener = new BlockFetchingListener() {
45
@Override
46
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
47
System.out.println("Successfully fetched block: " + blockId);
48
// Process the block data
49
}
50
51
@Override
52
public void onBlockFetchFailure(String blockId, Throwable exception) {
53
System.err.println("Failed to fetch block: " + blockId);
54
exception.printStackTrace();
55
}
56
};
57
58
// Fetch shuffle blocks from external service
59
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
60
client.fetchBlocks("shuffle-service-host", 7337, "executor-1", blockIds, listener);
61
62
// Clean up
63
client.close();
64
```
65
66
## Architecture
67
68
The Spark Network Shuffle module is built around several key components:
69
70
- **Client Layer**: `ExternalShuffleClient` and related classes provide the client-side API for fetching shuffle blocks with retry logic and fault tolerance
71
- **Server Layer**: `ExternalShuffleBlockHandler` and `ExternalShuffleBlockResolver` handle incoming requests and resolve block locations on the server side
72
- **Protocol Layer**: Structured message classes for client-server communication using efficient binary serialization
73
- **Security Layer**: SASL authentication support for secure communication between clients and shuffle services
74
- **Fault Tolerance**: Built-in retry mechanisms and proper error handling to ensure reliable data access in distributed environments
75
76
## Capabilities
77
78
### Client APIs
79
80
Client-side functionality for fetching shuffle blocks from external shuffle services with comprehensive error handling and retry logic.
81
82
```java { .api }
83
public abstract class ShuffleClient implements Closeable {
84
public void init(String appId);
85
public abstract void fetchBlocks(String host, int port, String execId,
86
String[] blockIds, BlockFetchingListener listener);
87
}
88
89
public class ExternalShuffleClient extends ShuffleClient {
90
public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
91
boolean saslEnabled, boolean saslEncryptionEnabled);
92
public void registerWithShuffleServer(String host, int port, String execId,
93
ExecutorShuffleInfo executorInfo) throws IOException;
94
}
95
```
96
97
[Client APIs](./client-apis.md)
98
99
### Server APIs
100
101
Server-side components for handling shuffle block requests, resolving block locations, and managing executor registrations.
102
103
```java { .api }
104
public class ExternalShuffleBlockHandler extends RpcHandler {
105
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
106
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
107
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
108
}
109
110
public class ExternalShuffleBlockResolver {
111
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
112
public ManagedBuffer getBlockData(String appId, String execId, String blockId);
113
}
114
```
115
116
[Server APIs](./server-apis.md)
117
118
### Protocol Messages
119
120
Structured communication protocol between shuffle clients and servers with efficient binary serialization.
121
122
```java { .api }
123
public abstract class BlockTransferMessage implements Encodable {
124
public ByteBuffer toByteBuffer();
125
126
public enum Type {
127
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE, REGISTER_DRIVER
128
}
129
}
130
131
public class ExecutorShuffleInfo implements Encodable {
132
public final String[] localDirs;
133
public final int subDirsPerLocalDir;
134
public final String shuffleManager;
135
}
136
```
137
138
[Protocol Messages](./protocol-messages.md)
139
140
### Security
141
142
SASL authentication support for secure communication between shuffle clients and external shuffle services.
143
144
```java { .api }
145
public class ShuffleSecretManager implements SecretKeyHolder {
146
public void registerApp(String appId, String shuffleSecret);
147
public void unregisterApp(String appId);
148
public String getSecretKey(String appId);
149
}
150
```
151
152
[Security](./security.md)
153
154
## Types
155
156
### Core Interfaces
157
158
```java { .api }
159
public interface BlockFetchingListener extends EventListener {
160
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
161
void onBlockFetchFailure(String blockId, Throwable exception);
162
}
163
```
164
165
### Configuration Types
166
167
```java { .api }
168
public static class AppExecId {
169
public final String appId;
170
public final String execId;
171
172
public AppExecId(String appId, String execId);
173
public boolean equals(Object o);
174
public int hashCode();
175
public String toString();
176
}
177
```