0
# Shuffle Client
1
2
Client-side functionality for connecting to external shuffle services and fetching shuffle blocks with retry logic and authentication support.
3
4
## Capabilities
5
6
### ShuffleClient Base Class
7
8
Abstract base class providing the core interface for reading shuffle files from executors or external services.
9
10
```java { .api }
11
/**
12
* Base class for reading shuffle files, either from an Executor or external service
13
*/
14
public abstract class ShuffleClient implements Closeable {
15
/**
16
* Initializes the ShuffleClient, specifying this Executor's appId.
17
* Must be called before any other method on the ShuffleClient.
18
*/
19
public void init(String appId);
20
21
/**
22
* Fetch a sequence of blocks from a remote node asynchronously.
23
* Note that this API takes a sequence so the implementation can batch requests.
24
*
25
* @param host the host of the remote node
26
* @param port the port of the remote node
27
* @param execId the executor id
28
* @param blockIds block ids to fetch
29
* @param listener the listener to receive block fetching status
30
* @param tempShuffleFileManager manager for creating temp shuffle files to reduce memory usage
31
*/
32
public abstract void fetchBlocks(
33
String host, int port, String execId, String[] blockIds,
34
BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);
35
}
36
```
37
38
### ExternalShuffleClient
39
40
Client for reading shuffle blocks from an external shuffle service instead of directly from other executors.
41
42
```java { .api }
43
/**
44
* Client for reading shuffle blocks which points to an external (outside of executor) server.
45
* This prevents losing shuffle data if executors are lost.
46
*/
47
public class ExternalShuffleClient extends ShuffleClient {
48
/**
49
* Creates an external shuffle client, with SASL optionally enabled.
50
* If SASL is not enabled, then secretKeyHolder may be null.
51
*/
52
public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled);
53
54
/**
55
* Initializes the client with the given application ID.
56
* Sets up transport context and client factory with optional authentication.
57
*/
58
@Override
59
public void init(String appId);
60
61
/**
62
* Fetch blocks from the external shuffle service with retry logic.
63
* Uses OneForOneBlockFetcher with optional RetryingBlockFetcher wrapper.
64
*/
65
@Override
66
public void fetchBlocks(String host, int port, String execId, String[] blockIds,
67
BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);
68
69
/**
70
* Registers this executor with an external shuffle server.
71
* Required to inform the shuffle server about where and how we store shuffle files.
72
*
73
* @param host Host of shuffle server
74
* @param port Port of shuffle server
75
* @param execId This Executor's id
76
* @param executorInfo Contains all info necessary for the service to find shuffle files
77
* @throws IOException if registration fails
78
* @throws InterruptedException if registration is interrupted
79
*/
80
public void registerWithShuffleServer(String host, int port, String execId,
81
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException;
82
83
/**
84
* Closes the client and releases resources.
85
*/
86
@Override
87
public void close();
88
}
89
```
90
91
**Usage Examples:**
92
93
```java
94
import org.apache.spark.network.shuffle.ExternalShuffleClient;
95
import org.apache.spark.network.shuffle.BlockFetchingListener;
96
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
97
import org.apache.spark.network.util.TransportConf;
98
import org.apache.spark.network.buffer.ManagedBuffer;
99
100
// Create client without authentication
101
TransportConf conf = new TransportConf("shuffle");
102
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
103
104
// Initialize with application ID
105
client.init("spark-app-12345");
106
107
// Register executor with shuffle service
108
String[] localDirs = {"/tmp/spark-local/executor-1"};
109
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
110
localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");
111
112
try {
113
client.registerWithShuffleServer("shuffle-host", 7337, "executor-1", executorInfo);
114
System.out.println("Executor registered successfully");
115
} catch (IOException | InterruptedException e) {
116
System.err.println("Registration failed: " + e.getMessage());
117
}
118
119
// Create listener for handling fetch results
120
BlockFetchingListener listener = new BlockFetchingListener() {
121
@Override
122
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
123
System.out.println("Successfully fetched block: " + blockId +
124
" (" + data.size() + " bytes)");
125
// Process the block data
126
// Important: data will be released automatically after this method returns
127
}
128
129
@Override
130
public void onBlockFetchFailure(String blockId, Throwable exception) {
131
System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());
132
}
133
};
134
135
// Fetch multiple blocks asynchronously
136
String[] blockIds = {"shuffle_0_1_0", "shuffle_0_1_1", "shuffle_0_2_0"};
137
client.fetchBlocks("shuffle-host", 7337, "executor-1", blockIds, listener, null);
138
139
// Clean up
140
client.close();
141
```
142
143
### OneForOneBlockFetcher
144
145
Internal fetcher implementation that handles individual block requests using a one-for-one strategy.
146
147
```java { .api }
148
/**
149
* Fetches shuffle blocks using the one-for-one strategy where each chunk corresponds to one block.
150
*/
151
public class OneForOneBlockFetcher {
152
/**
153
* Creates a block fetcher for the specified client and blocks.
154
*/
155
public OneForOneBlockFetcher(TransportClient client, String appId, String execId,
156
String[] blockIds, BlockFetchingListener listener, TransportConf conf,
157
TempShuffleFileManager tempFileManager);
158
159
/**
160
* Starts the block fetching process by sending OpenBlocks message to the server.
161
*/
162
public void start();
163
}
164
```
165
166
### RetryingBlockFetcher
167
168
Wrapper that adds retry logic to block fetching operations.
169
170
```java { .api }
171
/**
172
* Adds retry logic to block fetching operations.
173
*/
174
public class RetryingBlockFetcher {
175
/**
176
* Creates a retrying block fetcher.
177
*
178
* @param conf transport configuration containing retry settings
179
* @param blockFetchStarter strategy for creating and starting block fetchers
180
* @param blockIds blocks to fetch
181
* @param listener callback for fetch results
182
*/
183
public RetryingBlockFetcher(TransportConf conf, BlockFetchStarter blockFetchStarter,
184
String[] blockIds, BlockFetchingListener listener);
185
186
/**
187
* Starts the fetching process with retry logic.
188
*/
189
public void start();
190
191
/**
192
* Strategy interface for starting block fetches.
193
*/
194
public interface BlockFetchStarter {
195
/**
196
* Create and start a block fetcher.
197
*/
198
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws Exception;
199
}
200
}
201
```
202
203
## Error Handling
204
205
The client handles various error conditions:
206
207
- **IOException**: Network connectivity issues, server unavailable
208
- **InterruptedException**: Operations interrupted during execution
209
- **SecurityException**: Authentication failures when SASL is enabled
210
- **IllegalArgumentException**: Invalid parameters or block IDs
211
- **UnsupportedOperationException**: Server doesn't support requested operations
212
213
**Error Handling Example:**
214
215
```java
216
BlockFetchingListener errorHandlingListener = new BlockFetchingListener() {
217
@Override
218
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
219
// Handle successful fetch
220
processBlock(blockId, data);
221
}
222
223
@Override
224
public void onBlockFetchFailure(String blockId, Throwable exception) {
225
if (exception instanceof IOException) {
226
System.err.println("Network error fetching " + blockId + ", will retry");
227
// Retry logic here
228
} else if (exception instanceof SecurityException) {
229
System.err.println("Authentication failed: " + exception.getMessage());
230
// Handle auth failure
231
} else {
232
System.err.println("Unexpected error: " + exception.getMessage());
233
// Handle other errors
234
}
235
}
236
};
237
```