or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdhandler.mdindex.mdmesos.mdprotocol.mdresolver.mdsecurity.md

client.mddocs/

0

# Shuffle Client

1

2

Client-side functionality for connecting to external shuffle services and fetching shuffle blocks with retry logic and authentication support.

3

4

## Capabilities

5

6

### ShuffleClient Base Class

7

8

Abstract base class providing the core interface for reading shuffle files from executors or external services.

9

10

```java { .api }

11

/**

12

* Base class for reading shuffle files, either from an Executor or external service

13

*/

14

public abstract class ShuffleClient implements Closeable {

15

/**

16

* Initializes the ShuffleClient, specifying this Executor's appId.

17

* Must be called before any other method on the ShuffleClient.

18

*/

19

public void init(String appId);

20

21

/**

22

* Fetch a sequence of blocks from a remote node asynchronously.

23

* Note that this API takes a sequence so the implementation can batch requests.

24

*

25

* @param host the host of the remote node

26

* @param port the port of the remote node

27

* @param execId the executor id

28

* @param blockIds block ids to fetch

29

* @param listener the listener to receive block fetching status

30

* @param tempShuffleFileManager manager for creating temp shuffle files to reduce memory usage

31

*/

32

public abstract void fetchBlocks(

33

String host, int port, String execId, String[] blockIds,

34

BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);

35

}

36

```

37

38

### ExternalShuffleClient

39

40

Client for reading shuffle blocks from an external shuffle service instead of directly from other executors.

41

42

```java { .api }

43

/**

44

* Client for reading shuffle blocks which points to an external (outside of executor) server.

45

* This prevents losing shuffle data if executors are lost.

46

*/

47

public class ExternalShuffleClient extends ShuffleClient {

48

/**

49

* Creates an external shuffle client, with SASL optionally enabled.

50

* If SASL is not enabled, then secretKeyHolder may be null.

51

*/

52

public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled);

53

54

/**

55

* Initializes the client with the given application ID.

56

* Sets up transport context and client factory with optional authentication.

57

*/

58

@Override

59

public void init(String appId);

60

61

/**

62

* Fetch blocks from the external shuffle service with retry logic.

63

* Uses OneForOneBlockFetcher with optional RetryingBlockFetcher wrapper.

64

*/

65

@Override

66

public void fetchBlocks(String host, int port, String execId, String[] blockIds,

67

BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);

68

69

/**

70

* Registers this executor with an external shuffle server.

71

* Required to inform the shuffle server about where and how we store shuffle files.

72

*

73

* @param host Host of shuffle server

74

* @param port Port of shuffle server

75

* @param execId This Executor's id

76

* @param executorInfo Contains all info necessary for the service to find shuffle files

77

* @throws IOException if registration fails

78

* @throws InterruptedException if registration is interrupted

79

*/

80

public void registerWithShuffleServer(String host, int port, String execId,

81

ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException;

82

83

/**

84

* Closes the client and releases resources.

85

*/

86

@Override

87

public void close();

88

}

89

```

90

91

**Usage Examples:**

92

93

```java

94

import org.apache.spark.network.shuffle.ExternalShuffleClient;

95

import org.apache.spark.network.shuffle.BlockFetchingListener;

96

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

97

import org.apache.spark.network.util.TransportConf;

98

import org.apache.spark.network.buffer.ManagedBuffer;

99

100

// Create client without authentication

101

TransportConf conf = new TransportConf("shuffle");

102

ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);

103

104

// Initialize with application ID

105

client.init("spark-app-12345");

106

107

// Register executor with shuffle service

108

String[] localDirs = {"/tmp/spark-local/executor-1"};

109

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

110

localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");

111

112

try {

113

client.registerWithShuffleServer("shuffle-host", 7337, "executor-1", executorInfo);

114

System.out.println("Executor registered successfully");

115

} catch (IOException | InterruptedException e) {

116

System.err.println("Registration failed: " + e.getMessage());

117

}

118

119

// Create listener for handling fetch results

120

BlockFetchingListener listener = new BlockFetchingListener() {

121

@Override

122

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

123

System.out.println("Successfully fetched block: " + blockId +

124

" (" + data.size() + " bytes)");

125

// Process the block data

126

// Important: data will be released automatically after this method returns

127

}

128

129

@Override

130

public void onBlockFetchFailure(String blockId, Throwable exception) {

131

System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());

132

}

133

};

134

135

// Fetch multiple blocks asynchronously

136

String[] blockIds = {"shuffle_0_1_0", "shuffle_0_1_1", "shuffle_0_2_0"};

137

client.fetchBlocks("shuffle-host", 7337, "executor-1", blockIds, listener, null);

138

139

// Clean up

140

client.close();

141

```

142

143

### OneForOneBlockFetcher

144

145

Internal fetcher implementation that handles individual block requests using a one-for-one strategy.

146

147

```java { .api }

148

/**

149

* Fetches shuffle blocks using the one-for-one strategy where each chunk corresponds to one block.

150

*/

151

public class OneForOneBlockFetcher {

152

/**

153

* Creates a block fetcher for the specified client and blocks.

154

*/

155

public OneForOneBlockFetcher(TransportClient client, String appId, String execId,

156

String[] blockIds, BlockFetchingListener listener, TransportConf conf,

157

TempShuffleFileManager tempFileManager);

158

159

/**

160

* Starts the block fetching process by sending OpenBlocks message to the server.

161

*/

162

public void start();

163

}

164

```

165

166

### RetryingBlockFetcher

167

168

Wrapper that adds retry logic to block fetching operations.

169

170

```java { .api }

171

/**

172

* Adds retry logic to block fetching operations.

173

*/

174

public class RetryingBlockFetcher {

175

/**

176

* Creates a retrying block fetcher.

177

*

178

* @param conf transport configuration containing retry settings

179

* @param blockFetchStarter strategy for creating and starting block fetchers

180

* @param blockIds blocks to fetch

181

* @param listener callback for fetch results

182

*/

183

public RetryingBlockFetcher(TransportConf conf, BlockFetchStarter blockFetchStarter,

184

String[] blockIds, BlockFetchingListener listener);

185

186

/**

187

* Starts the fetching process with retry logic.

188

*/

189

public void start();

190

191

/**

192

* Strategy interface for starting block fetches.

193

*/

194

public interface BlockFetchStarter {

195

/**

196

* Create and start a block fetcher.

197

*/

198

void createAndStart(String[] blockIds, BlockFetchingListener listener) throws Exception;

199

}

200

}

201

```

202

203

## Error Handling

204

205

The client handles various error conditions:

206

207

- **IOException**: Network connectivity issues, server unavailable

208

- **InterruptedException**: Operations interrupted during execution

209

- **SecurityException**: Authentication failures when SASL is enabled

210

- **IllegalArgumentException**: Invalid parameters or block IDs

211

- **UnsupportedOperationException**: Server doesn't support requested operations

212

213

**Error Handling Example:**

214

215

```java

216

BlockFetchingListener errorHandlingListener = new BlockFetchingListener() {

217

@Override

218

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

219

// Handle successful fetch

220

processBlock(blockId, data);

221

}

222

223

@Override

224

public void onBlockFetchFailure(String blockId, Throwable exception) {

225

if (exception instanceof IOException) {

226

System.err.println("Network error fetching " + blockId + ", will retry");

227

// Retry logic here

228

} else if (exception instanceof SecurityException) {

229

System.err.println("Authentication failed: " + exception.getMessage());

230

// Handle auth failure

231

} else {

232

System.err.println("Unexpected error: " + exception.getMessage());

233

// Handle other errors

234

}

235

}

236

};

237

```