0
# Shuffle Client Operations
1
2
The shuffle client components provide the primary interface for fetching shuffle blocks from external shuffle services. This enables fault-tolerant shuffle data access by persisting shuffle data outside of executor processes.
3
4
## Core Client Classes
5
6
### ShuffleClient
7
8
```java { .api }
9
public abstract class ShuffleClient implements Closeable {
10
public void init(String appId);
11
12
public abstract void fetchBlocks(
13
String host,
14
int port,
15
String execId,
16
String[] blockIds,
17
BlockFetchingListener listener
18
);
19
20
public void close() throws IOException;
21
}
22
```
23
24
Abstract base class for shuffle clients. Must be initialized with an application ID before use.
25
26
**Parameters:**
27
- `appId` (String): Spark application identifier used for authentication and tracking
28
- `host` (String): Hostname of the shuffle service
29
- `port` (int): Port number of the shuffle service
30
- `execId` (String): Executor ID that originally wrote the shuffle blocks
31
- `blockIds` (String[]): Array of block identifiers to fetch
32
- `listener` (BlockFetchingListener): Callback interface for handling fetch results
33
34
### ExternalShuffleClient
35
36
```java { .api }
37
public class ExternalShuffleClient extends ShuffleClient {
38
public ExternalShuffleClient(
39
TransportConf conf,
40
SecretKeyHolder secretKeyHolder,
41
boolean saslEnabled,
42
boolean saslEncryptionEnabled
43
);
44
45
public void registerWithShuffleServer(
46
String host,
47
int port,
48
String execId,
49
ExecutorShuffleInfo executorInfo
50
) throws IOException;
51
52
public void fetchBlocks(
53
String host,
54
int port,
55
String execId,
56
String[] blockIds,
57
BlockFetchingListener listener
58
);
59
}
60
```
61
62
Main implementation of the shuffle client for external shuffle services. Supports SASL authentication and automatic retry logic.
63
64
**Constructor Parameters:**
65
- `conf` (TransportConf): Network transport configuration
66
- `secretKeyHolder` (SecretKeyHolder): Interface for SASL secret management, can be null if SASL disabled
67
- `saslEnabled` (boolean): Whether to enable SASL authentication
68
- `saslEncryptionEnabled` (boolean): Whether to enable SASL encryption (requires SASL to be enabled)
69
70
**Key Methods:**
71
72
#### registerWithShuffleServer
73
74
Registers an executor with the external shuffle server, providing information about where shuffle files are stored.
75
76
**Parameters:**
77
- `host` (String): Shuffle server hostname
78
- `port` (int): Shuffle server port
79
- `execId` (String): Executor identifier
80
- `executorInfo` (ExecutorShuffleInfo): Configuration describing shuffle file locations
81
82
**Throws:**
83
- `IOException`: If registration fails due to network or server errors
84
85
#### fetchBlocks
86
87
Asynchronously fetches shuffle blocks from the external service with automatic retry support.
88
89
**Parameters:**
90
- `host` (String): Shuffle server hostname
91
- `port` (int): Shuffle server port
92
- `execId` (String): Executor that wrote the blocks
93
- `blockIds` (String[]): Block identifiers to fetch
94
- `listener` (BlockFetchingListener): Callback for success/failure notifications
95
96
## Mesos Integration
97
98
### MesosExternalShuffleClient
99
100
```java { .api }
101
public class MesosExternalShuffleClient extends ExternalShuffleClient {
102
public MesosExternalShuffleClient(
103
TransportConf conf,
104
SecretKeyHolder secretKeyHolder,
105
boolean saslEnabled,
106
boolean saslEncryptionEnabled
107
);
108
109
public void registerDriverWithShuffleService(String host, int port) throws IOException;
110
}
111
```
112
113
Specialized client for Mesos deployments that adds driver registration functionality for cleanup purposes.
114
115
#### registerDriverWithShuffleService
116
117
Registers the Spark driver with the external shuffle service for proper cleanup of shuffle files when the application completes.
118
119
**Parameters:**
120
- `host` (String): Shuffle service hostname
121
- `port` (int): Shuffle service port
122
123
**Throws:**
124
- `IOException`: If registration fails
125
126
## Usage Examples
127
128
### Basic Client Setup
129
130
```java
131
// Create configuration
132
TransportConf conf = new TransportConf("shuffle", new SparkConf());
133
134
// Create client without SASL
135
ExternalShuffleClient client = new ExternalShuffleClient(
136
conf, null, false, false
137
);
138
139
// Initialize with app ID
140
client.init("spark-app-123");
141
```
142
143
### Client with SASL Authentication
144
145
```java
146
// Create secret manager
147
ShuffleSecretManager secretManager = new ShuffleSecretManager();
148
secretManager.registerApp("spark-app-123", "my-secret-key");
149
150
// Create secure client
151
ExternalShuffleClient client = new ExternalShuffleClient(
152
conf,
153
secretManager, // Secret key holder
154
true, // Enable SASL
155
true // Enable SASL encryption
156
);
157
158
client.init("spark-app-123");
159
```
160
161
### Executor Registration
162
163
```java
164
// Define executor shuffle configuration
165
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
166
new String[]{"/tmp/spark-shuffle", "/tmp/spark-shuffle2"}, // Local directories
167
64, // Number of subdirectories per local directory
168
"org.apache.spark.shuffle.sort.SortShuffleManager" // Shuffle manager class
169
);
170
171
// Register with shuffle server
172
client.registerWithShuffleServer("shuffle-server", 7337, "executor-1", executorInfo);
173
```
174
175
### Block Fetching with Callback
176
177
```java
178
// Implement fetch callback
179
BlockFetchingListener listener = new BlockFetchingListener() {
180
@Override
181
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
182
System.out.println("Fetched block " + blockId + ", size: " + data.size());
183
// Process block data
184
try {
185
byte[] blockData = new byte[(int) data.size()];
186
data.nioByteBuffer().get(blockData);
187
// Handle the shuffle block data...
188
} finally {
189
data.release(); // Important: release buffer when done
190
}
191
}
192
193
@Override
194
public void onBlockFetchFailure(String blockId, Throwable exception) {
195
System.err.println("Failed to fetch " + blockId + ": " + exception.getMessage());
196
// Handle failure, perhaps retry or skip
197
}
198
};
199
200
// Fetch multiple blocks
201
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
202
client.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);
203
```
204
205
### Mesos-Specific Usage
206
207
```java
208
// Create Mesos client
209
MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(
210
conf, secretManager, true, false
211
);
212
213
mesosClient.init("spark-app-123");
214
215
// Register driver for cleanup
216
mesosClient.registerDriverWithShuffleService("shuffle-server", 7337);
217
218
// Use normally for block fetching
219
mesosClient.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);
220
```
221
222
## Error Handling
223
224
The shuffle client implements automatic retry logic for transient network failures. Configure retry behavior through `TransportConf`:
225
226
```java
227
// Set maximum retry attempts
228
conf.set("spark.shuffle.io.maxRetries", "3");
229
230
// Set retry wait time
231
conf.set("spark.shuffle.io.retryWait", "5s");
232
```
233
234
Common exceptions:
235
- `IOException`: Network connectivity issues, server unavailable
236
- `IllegalArgumentException`: Invalid parameters or configuration
237
- `SecurityException`: SASL authentication failures
238
239
## Resource Management
240
241
Always close the shuffle client to free network resources:
242
243
```java
244
try {
245
// Use client...
246
} finally {
247
client.close();
248
}