0
# Shuffle Client Operations
1
2
Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.
3
4
## Capabilities
5
6
### ShuffleClient Abstract Base Class
7
8
Base interface for reading shuffle files from executors or external services.
9
10
```java { .api }
11
/**
12
* Abstract base class for clients that read shuffle files from executors or external services
13
*/
14
public abstract class ShuffleClient implements Closeable {
15
/**
16
* Initialize the client for the given application
17
* @param appId - Application ID to initialize client for
18
*/
19
public void init(String appId);
20
21
/**
22
* Fetch blocks from a remote shuffle service
23
* @param host - Host name of the shuffle service
24
* @param port - Port number of the shuffle service
25
* @param execId - Executor ID
26
* @param blockIds - Array of block IDs to fetch
27
* @param listener - Listener for block fetch events
28
* @param downloadFileManager - Manager for temporary download files, can be null
29
*/
30
public abstract void fetchBlocks(
31
String host, int port, String execId, String[] blockIds,
32
BlockFetchingListener listener, DownloadFileManager downloadFileManager
33
);
34
35
/**
36
* Get shuffle metrics for monitoring
37
* @return MetricSet containing shuffle performance metrics
38
*/
39
public MetricSet shuffleMetrics();
40
41
/**
42
* Close the client and clean up resources
43
*/
44
public void close();
45
}
46
```
47
48
### ExternalShuffleClient Implementation
49
50
Client for reading shuffle blocks from external shuffle service.
51
52
```java { .api }
53
/**
54
* Client implementation for reading shuffle blocks from external shuffle service
55
*/
56
public class ExternalShuffleClient extends ShuffleClient {
57
/**
58
* Create an external shuffle client
59
* @param conf - Transport configuration
60
* @param secretKeyHolder - Secret key holder for authentication
61
* @param authEnabled - Whether authentication is enabled
62
* @param registrationTimeoutMs - Timeout for registration operations in milliseconds
63
*/
64
public ExternalShuffleClient(
65
TransportConf conf, SecretKeyHolder secretKeyHolder,
66
boolean authEnabled, long registrationTimeoutMs
67
);
68
69
/**
70
* Initialize the client for the given application
71
* @param appId - Application ID to initialize client for
72
*/
73
@Override
74
public void init(String appId);
75
76
/**
77
* Fetch blocks from the external shuffle service
78
* @param host - Host name of the shuffle service
79
* @param port - Port number of the shuffle service
80
* @param execId - Executor ID
81
* @param blockIds - Array of block IDs to fetch
82
* @param listener - Listener for block fetch events
83
* @param downloadFileManager - Manager for temporary download files, can be null
84
*/
85
@Override
86
public void fetchBlocks(
87
String host, int port, String execId, String[] blockIds,
88
BlockFetchingListener listener, DownloadFileManager downloadFileManager
89
);
90
91
/**
92
* Get shuffle metrics for monitoring
93
* @return MetricSet containing shuffle performance metrics
94
*/
95
@Override
96
public MetricSet shuffleMetrics();
97
98
/**
99
* Register an executor with the external shuffle service
100
* @param host - Host name of the shuffle service
101
* @param port - Port number of the shuffle service
102
* @param execId - Executor ID to register
103
* @param executorInfo - Information about the executor's shuffle configuration
104
* @throws IOException if network communication fails
105
* @throws InterruptedException if the operation is interrupted
106
*/
107
public void registerWithShuffleServer(
108
String host, int port, String execId, ExecutorShuffleInfo executorInfo
109
) throws IOException, InterruptedException;
110
111
/**
112
* Close the client and clean up resources
113
*/
114
@Override
115
public void close();
116
}
117
```
118
119
**Usage Examples:**
120
121
```java
122
import org.apache.spark.network.shuffle.ExternalShuffleClient;
123
import org.apache.spark.network.shuffle.BlockFetchingListener;
124
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
125
import org.apache.spark.network.sasl.ShuffleSecretManager;
126
import org.apache.spark.network.util.TransportConf;
127
128
// Create transport configuration
129
TransportConf conf = new TransportConf("shuffle");
130
131
// Create secret manager for authentication
132
ShuffleSecretManager secretManager = new ShuffleSecretManager();
133
secretManager.registerApp("app1", "secretKey123");
134
135
// Create external shuffle client
136
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 10000);
137
138
// Initialize for application
139
client.init("app1");
140
141
// Register executor with shuffle service
142
String[] localDirs = {"/tmp/spark-1", "/tmp/spark-2"};
143
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
144
client.registerWithShuffleServer("shuffle-node-1", 7337, "executor-1", executorInfo);
145
146
// Create block fetching listener
147
BlockFetchingListener listener = new BlockFetchingListener() {
148
@Override
149
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
150
System.out.println("Successfully fetched block: " + blockId +
151
", size: " + data.size() + " bytes");
152
// Process the block data
153
try {
154
// Convert to bytes and process
155
byte[] blockData = ByteStreams.toByteArray(data.createInputStream());
156
processBlockData(blockId, blockData);
157
} catch (IOException e) {
158
System.err.println("Error processing block data: " + e.getMessage());
159
} finally {
160
data.release(); // Important: release the buffer
161
}
162
}
163
164
@Override
165
public void onBlockFetchFailure(String blockId, Throwable exception) {
166
System.err.println("Failed to fetch block: " + blockId + ", error: " + exception.getMessage());
167
// Handle retry logic or error reporting
168
handleBlockFetchFailure(blockId, exception);
169
}
170
};
171
172
// Fetch specific blocks
173
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
174
client.fetchBlocks("shuffle-node-1", 7337, "executor-1", blockIds, listener, null);
175
176
// Monitor shuffle metrics
177
MetricSet metrics = client.shuffleMetrics();
178
System.out.println("Shuffle metrics: " + metrics);
179
180
// Clean up when done
181
client.close();
182
```
183
184
### Block Fetching Best Practices
185
186
1. **Listener Implementation**: Always implement both success and failure callbacks in BlockFetchingListener
187
2. **Buffer Management**: Release ManagedBuffer instances after processing to avoid memory leaks
188
3. **Error Handling**: Implement proper retry logic for transient failures
189
4. **Resource Cleanup**: Always call close() on the client when finished
190
5. **Authentication**: Use ShuffleSecretManager for secure deployments
191
6. **Configuration**: Tune TransportConf parameters for optimal performance
192
193
### Common Configuration Parameters
194
195
The ExternalShuffleClient behavior can be configured through TransportConf:
196
197
- `spark.shuffle.io.connectionTimeout` - Connection timeout for shuffle operations
198
- `spark.shuffle.io.numConnectionsPerPeer` - Number of connections per shuffle peer
199
- `spark.shuffle.io.retryWait` - Time to wait between retry attempts
200
- `spark.shuffle.io.maxRetries` - Maximum number of retry attempts
201
- `spark.shuffle.io.preferDirectBufs` - Whether to prefer direct buffers for network I/O