0
# Client APIs
1
2
Client-side functionality for fetching shuffle blocks from external shuffle services with comprehensive error handling, retry logic, and fault tolerance mechanisms.
3
4
## Capabilities
5
6
### Shuffle Client Base
7
8
Abstract base class providing the core interface for reading shuffle files from executors or external services.
9
10
```java { .api }
11
/**
12
* Base interface for reading shuffle files, either from an Executor or external service
13
*/
14
public abstract class ShuffleClient implements Closeable {
15
/**
16
* Initialize the client with application ID
17
* @param appId - Spark application identifier
18
*/
19
public void init(String appId);
20
21
/**
22
* Fetch blocks from the specified host and executor
23
* @param host - Target host address
24
* @param port - Target port number
25
* @param execId - Executor identifier
26
* @param blockIds - Array of block identifiers to fetch
27
* @param listener - Callback for handling fetch results
28
*/
29
public abstract void fetchBlocks(String host, int port, String execId,
30
String[] blockIds, BlockFetchingListener listener);
31
32
/**
33
* Close the client and cleanup resources
34
*/
35
public void close() throws IOException;
36
}
37
```
38
39
### External Shuffle Client
40
41
Concrete implementation for reading shuffle blocks from external shuffle services with SASL authentication support.
42
43
```java { .api }
44
/**
45
* Client for reading shuffle blocks from external shuffle service
46
*/
47
public class ExternalShuffleClient extends ShuffleClient {
48
/**
49
* Create external shuffle client with security configuration
50
* @param conf - Transport configuration
51
* @param secretKeyHolder - Secret key holder for authentication
52
* @param saslEnabled - Whether SASL authentication is enabled
53
* @param saslEncryptionEnabled - Whether SASL encryption is enabled
54
*/
55
public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
56
boolean saslEnabled, boolean saslEncryptionEnabled);
57
58
/**
59
* Initialize client with application ID
60
* @param appId - Spark application identifier
61
*/
62
public void init(String appId);
63
64
/**
65
* Fetch blocks from external shuffle service
66
* @param host - Shuffle service host address
67
* @param port - Shuffle service port number
68
* @param execId - Executor identifier
69
* @param blockIds - Array of block identifiers to fetch
70
* @param listener - Callback for handling fetch results
71
*/
72
public void fetchBlocks(String host, int port, String execId,
73
String[] blockIds, BlockFetchingListener listener);
74
75
/**
76
* Register executor with shuffle server
77
* @param host - Shuffle server host address
78
* @param port - Shuffle server port number
79
* @param execId - Executor identifier
80
* @param executorInfo - Executor configuration information
81
* @throws IOException if registration fails
82
*/
83
public void registerWithShuffleServer(String host, int port, String execId,
84
ExecutorShuffleInfo executorInfo) throws IOException;
85
86
/**
87
* Close client and cleanup resources
88
*/
89
public void close();
90
}
91
```
92
93
**Usage Example:**
94
95
```java
96
import org.apache.spark.network.shuffle.ExternalShuffleClient;
97
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
98
import org.apache.spark.network.util.TransportConf;
99
100
// Create client with SASL disabled
101
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
102
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
103
104
// Initialize and register executor
105
client.init("spark-app-123");
106
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
107
new String[]{"/tmp/spark-local-1"}, 64, "org.apache.spark.shuffle.sort.SortShuffleManager"
108
);
109
client.registerWithShuffleServer("shuffle-host", 7337, "executor-1", info);
110
111
// Fetch blocks with callback
112
client.fetchBlocks("shuffle-host", 7337, "executor-1",
113
new String[]{"shuffle_0_0_0", "shuffle_0_0_1"}, listener);
114
```
115
116
### Block Fetching Listener
117
118
Event listener interface for handling block fetch operations with success and failure callbacks.
119
120
```java { .api }
121
/**
122
* Event listener for block fetch operations
123
*/
124
public interface BlockFetchingListener extends EventListener {
125
/**
126
* Called when a block is successfully fetched
127
* @param blockId - Identifier of the fetched block
128
* @param data - Block data as managed buffer
129
*/
130
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
131
132
/**
133
* Called when block fetch fails
134
* @param blockId - Identifier of the failed block
135
* @param exception - Exception that caused the failure
136
*/
137
void onBlockFetchFailure(String blockId, Throwable exception);
138
}
139
```
140
141
**Usage Example:**
142
143
```java
144
BlockFetchingListener listener = new BlockFetchingListener() {
145
@Override
146
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
147
try {
148
// Process the block data
149
ByteBuffer buffer = data.nioByteBuffer();
150
System.out.println("Fetched block " + blockId + " with " + buffer.remaining() + " bytes");
151
// Handle the shuffle data...
152
} finally {
153
data.release(); // Important: release the buffer
154
}
155
}
156
157
@Override
158
public void onBlockFetchFailure(String blockId, Throwable exception) {
159
System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());
160
// Implement retry logic or error handling
161
}
162
};
163
```
164
165
### One-for-One Block Fetcher
166
167
Simple wrapper on TransportClient that interprets each chunk as a whole block for straightforward block fetching.
168
169
```java { .api }
170
/**
171
* Simple wrapper interpreting each chunk as a whole block
172
*/
173
public class OneForOneBlockFetcher {
174
/**
175
* Create block fetcher for specified blocks
176
* @param client - Transport client for network communication
177
* @param appId - Spark application identifier
178
* @param execId - Executor identifier
179
* @param blockIds - Array of block identifiers to fetch
180
* @param listener - Callback for handling fetch results
181
*/
182
public OneForOneBlockFetcher(TransportClient client, String appId, String execId,
183
String[] blockIds, BlockFetchingListener listener);
184
185
/**
186
* Start fetching the specified blocks
187
*/
188
public void start();
189
}
190
```
191
192
### Retrying Block Fetcher
193
194
Wrapper that adds automatic retry capability to block fetching operations with configurable retry policies.
195
196
```java { .api }
197
/**
198
* Wraps another BlockFetcher with automatic retry capability
199
*/
200
public class RetryingBlockFetcher {
201
/**
202
* Create retrying block fetcher with specified configuration
203
* @param conf - Transport configuration including retry settings
204
* @param fetchStarter - Factory for creating block fetchers
205
* @param blockIds - Array of block identifiers to fetch
206
* @param listener - Callback for handling fetch results
207
*/
208
public RetryingBlockFetcher(TransportConf conf, BlockFetchStarter fetchStarter,
209
String[] blockIds, BlockFetchingListener listener);
210
211
/**
212
* Start fetching blocks with retry logic
213
*/
214
public void start();
215
}
216
217
/**
218
* Interface for creating and starting block fetchers
219
*/
220
public static interface BlockFetchStarter {
221
/**
222
* Create and start block fetcher for specified blocks
223
* @param blockIds - Array of block identifiers to fetch
224
* @param listener - Callback for handling fetch results
225
* @throws IOException if fetcher creation fails
226
*/
227
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
228
}
229
```
230
231
**Usage Example:**
232
233
```java
234
// Create a retry-enabled block fetcher
235
BlockFetchStarter fetchStarter = (blockIds, listener) -> {
236
TransportClient client = transportContext.createClient(host, port);
237
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client, appId, execId, blockIds, listener);
238
fetcher.start();
239
};
240
241
RetryingBlockFetcher retryingFetcher = new RetryingBlockFetcher(
242
conf, fetchStarter, blockIds, originalListener
243
);
244
retryingFetcher.start();
245
```
246
247
## Mesos Integration
248
249
### Mesos External Shuffle Client
250
251
Extended client for external shuffle service in Mesos coarse-grained mode with driver registration support.
252
253
```java { .api }
254
/**
255
* Client for external shuffle service in Mesos coarse-grained mode
256
*/
257
public class MesosExternalShuffleClient extends ExternalShuffleClient {
258
/**
259
* Create Mesos external shuffle client
260
* @param conf - Transport configuration
261
* @param secretKeyHolder - Secret key holder for authentication
262
* @param saslEnabled - Whether SASL authentication is enabled
263
* @param saslEncryptionEnabled - Whether SASL encryption is enabled
264
*/
265
public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
266
boolean saslEnabled, boolean saslEncryptionEnabled);
267
268
/**
269
* Register driver with shuffle service for Mesos integration
270
* @param host - Shuffle service host address
271
* @param port - Shuffle service port number
272
* @throws IOException if registration fails
273
*/
274
public void registerDriverWithShuffleService(String host, int port) throws IOException;
275
}
276
```