or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

shuffle-client.mddocs/

0

# Shuffle Client Operations

1

2

Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.

3

4

## Capabilities

5

6

### ShuffleClient Abstract Base Class

7

8

Base interface for reading shuffle files from executors or external services.

9

10

```java { .api }

11

/**

12

* Abstract base class for clients that read shuffle files from executors or external services

13

*/

14

public abstract class ShuffleClient implements Closeable {

15

/**

16

* Initialize the client for the given application

17

* @param appId - Application ID to initialize client for

18

*/

19

public void init(String appId);

20

21

/**

22

* Fetch blocks from a remote shuffle service

23

* @param host - Host name of the shuffle service

24

* @param port - Port number of the shuffle service

25

* @param execId - Executor ID

26

* @param blockIds - Array of block IDs to fetch

27

* @param listener - Listener for block fetch events

28

* @param downloadFileManager - Manager for temporary download files, can be null

29

*/

30

public abstract void fetchBlocks(

31

String host, int port, String execId, String[] blockIds,

32

BlockFetchingListener listener, DownloadFileManager downloadFileManager

33

);

34

35

/**

36

* Get shuffle metrics for monitoring

37

* @return MetricSet containing shuffle performance metrics

38

*/

39

public MetricSet shuffleMetrics();

40

41

/**

42

* Close the client and clean up resources

43

*/

44

public void close();

45

}

46

```

47

48

### ExternalShuffleClient Implementation

49

50

Client for reading shuffle blocks from external shuffle service.

51

52

```java { .api }

53

/**

54

* Client implementation for reading shuffle blocks from external shuffle service

55

*/

56

public class ExternalShuffleClient extends ShuffleClient {

57

/**

58

* Create an external shuffle client

59

* @param conf - Transport configuration

60

* @param secretKeyHolder - Secret key holder for authentication

61

* @param authEnabled - Whether authentication is enabled

62

* @param registrationTimeoutMs - Timeout for registration operations in milliseconds

63

*/

64

public ExternalShuffleClient(

65

TransportConf conf, SecretKeyHolder secretKeyHolder,

66

boolean authEnabled, long registrationTimeoutMs

67

);

68

69

/**

70

* Initialize the client for the given application

71

* @param appId - Application ID to initialize client for

72

*/

73

@Override

74

public void init(String appId);

75

76

/**

77

* Fetch blocks from the external shuffle service

78

* @param host - Host name of the shuffle service

79

* @param port - Port number of the shuffle service

80

* @param execId - Executor ID

81

* @param blockIds - Array of block IDs to fetch

82

* @param listener - Listener for block fetch events

83

* @param downloadFileManager - Manager for temporary download files, can be null

84

*/

85

@Override

86

public void fetchBlocks(

87

String host, int port, String execId, String[] blockIds,

88

BlockFetchingListener listener, DownloadFileManager downloadFileManager

89

);

90

91

/**

92

* Get shuffle metrics for monitoring

93

* @return MetricSet containing shuffle performance metrics

94

*/

95

@Override

96

public MetricSet shuffleMetrics();

97

98

/**

99

* Register an executor with the external shuffle service

100

* @param host - Host name of the shuffle service

101

* @param port - Port number of the shuffle service

102

* @param execId - Executor ID to register

103

* @param executorInfo - Information about the executor's shuffle configuration

104

* @throws IOException if network communication fails

105

* @throws InterruptedException if the operation is interrupted

106

*/

107

public void registerWithShuffleServer(

108

String host, int port, String execId, ExecutorShuffleInfo executorInfo

109

) throws IOException, InterruptedException;

110

111

/**

112

* Close the client and clean up resources

113

*/

114

@Override

115

public void close();

116

}

117

```

118

119

**Usage Examples:**

120

121

```java

122

import org.apache.spark.network.shuffle.ExternalShuffleClient;

123

import org.apache.spark.network.shuffle.BlockFetchingListener;

124

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

125

import org.apache.spark.network.sasl.ShuffleSecretManager;

126

import org.apache.spark.network.util.TransportConf;

127

128

// Create transport configuration

129

TransportConf conf = new TransportConf("shuffle");

130

131

// Create secret manager for authentication

132

ShuffleSecretManager secretManager = new ShuffleSecretManager();

133

secretManager.registerApp("app1", "secretKey123");

134

135

// Create external shuffle client

136

ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 10000);

137

138

// Initialize for application

139

client.init("app1");

140

141

// Register executor with shuffle service

142

String[] localDirs = {"/tmp/spark-1", "/tmp/spark-2"};

143

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");

144

client.registerWithShuffleServer("shuffle-node-1", 7337, "executor-1", executorInfo);

145

146

// Create block fetching listener

147

BlockFetchingListener listener = new BlockFetchingListener() {

148

@Override

149

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

150

System.out.println("Successfully fetched block: " + blockId +

151

", size: " + data.size() + " bytes");

152

// Process the block data

153

try {

154

// Convert to bytes and process

155

byte[] blockData = ByteStreams.toByteArray(data.createInputStream());

156

processBlockData(blockId, blockData);

157

} catch (IOException e) {

158

System.err.println("Error processing block data: " + e.getMessage());

159

} finally {

160

data.release(); // Important: release the buffer

161

}

162

}

163

164

@Override

165

public void onBlockFetchFailure(String blockId, Throwable exception) {

166

System.err.println("Failed to fetch block: " + blockId + ", error: " + exception.getMessage());

167

// Handle retry logic or error reporting

168

handleBlockFetchFailure(blockId, exception);

169

}

170

};

171

172

// Fetch specific blocks

173

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};

174

client.fetchBlocks("shuffle-node-1", 7337, "executor-1", blockIds, listener, null);

175

176

// Monitor shuffle metrics

177

MetricSet metrics = client.shuffleMetrics();

178

System.out.println("Shuffle metrics: " + metrics);

179

180

// Clean up when done

181

client.close();

182

```

183

184

### Block Fetching Best Practices

185

186

1. **Listener Implementation**: Always implement both success and failure callbacks in BlockFetchingListener

187

2. **Buffer Management**: Release ManagedBuffer instances after processing to avoid memory leaks

188

3. **Error Handling**: Implement proper retry logic for transient failures

189

4. **Resource Cleanup**: Always call close() on the client when finished

190

5. **Authentication**: Use ShuffleSecretManager for secure deployments

191

6. **Configuration**: Tune TransportConf parameters for optimal performance

192

193

### Common Configuration Parameters

194

195

The ExternalShuffleClient behavior can be configured through TransportConf:

196

197

- `spark.shuffle.io.connectionTimeout` - Connection timeout for shuffle operations

198

- `spark.shuffle.io.numConnectionsPerPeer` - Number of connections per shuffle peer

199

- `spark.shuffle.io.retryWait` - Time to wait between retry attempts

200

- `spark.shuffle.io.maxRetries` - Maximum number of retry attempts

201

- `spark.shuffle.io.preferDirectBufs` - Whether to prefer direct buffers for network I/O