or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md

shuffle-client.mddocs/

0

# Shuffle Client Operations

1

2

The shuffle client components provide the primary interface for fetching shuffle blocks from external shuffle services. This enables fault-tolerant shuffle data access by persisting shuffle data outside of executor processes.

3

4

## Core Client Classes

5

6

### ShuffleClient

7

8

```java { .api }

9

public abstract class ShuffleClient implements Closeable {

10

public void init(String appId);

11

12

public abstract void fetchBlocks(

13

String host,

14

int port,

15

String execId,

16

String[] blockIds,

17

BlockFetchingListener listener

18

);

19

20

public void close() throws IOException;

21

}

22

```

23

24

Abstract base class for shuffle clients. Must be initialized with an application ID before use.

25

26

**Parameters:**

27

- `appId` (String): Spark application identifier used for authentication and tracking

28

- `host` (String): Hostname of the shuffle service

29

- `port` (int): Port number of the shuffle service

30

- `execId` (String): Executor ID that originally wrote the shuffle blocks

31

- `blockIds` (String[]): Array of block identifiers to fetch

32

- `listener` (BlockFetchingListener): Callback interface for handling fetch results

33

34

### ExternalShuffleClient

35

36

```java { .api }

37

public class ExternalShuffleClient extends ShuffleClient {

38

public ExternalShuffleClient(

39

TransportConf conf,

40

SecretKeyHolder secretKeyHolder,

41

boolean saslEnabled,

42

boolean saslEncryptionEnabled

43

);

44

45

public void registerWithShuffleServer(

46

String host,

47

int port,

48

String execId,

49

ExecutorShuffleInfo executorInfo

50

) throws IOException;

51

52

public void fetchBlocks(

53

String host,

54

int port,

55

String execId,

56

String[] blockIds,

57

BlockFetchingListener listener

58

);

59

}

60

```

61

62

Main implementation of the shuffle client for external shuffle services. Supports SASL authentication and automatic retry logic.

63

64

**Constructor Parameters:**

65

- `conf` (TransportConf): Network transport configuration

66

- `secretKeyHolder` (SecretKeyHolder): Interface for SASL secret management, can be null if SASL disabled

67

- `saslEnabled` (boolean): Whether to enable SASL authentication

68

- `saslEncryptionEnabled` (boolean): Whether to enable SASL encryption (requires SASL to be enabled)

69

70

**Key Methods:**

71

72

#### registerWithShuffleServer

73

74

Registers an executor with the external shuffle server, providing information about where shuffle files are stored.

75

76

**Parameters:**

77

- `host` (String): Shuffle server hostname

78

- `port` (int): Shuffle server port

79

- `execId` (String): Executor identifier

80

- `executorInfo` (ExecutorShuffleInfo): Configuration describing shuffle file locations

81

82

**Throws:**

83

- `IOException`: If registration fails due to network or server errors

84

85

#### fetchBlocks

86

87

Asynchronously fetches shuffle blocks from the external service with automatic retry support.

88

89

**Parameters:**

90

- `host` (String): Shuffle server hostname

91

- `port` (int): Shuffle server port

92

- `execId` (String): Executor that wrote the blocks

93

- `blockIds` (String[]): Block identifiers to fetch

94

- `listener` (BlockFetchingListener): Callback for success/failure notifications

95

96

## Mesos Integration

97

98

### MesosExternalShuffleClient

99

100

```java { .api }

101

public class MesosExternalShuffleClient extends ExternalShuffleClient {

102

public MesosExternalShuffleClient(

103

TransportConf conf,

104

SecretKeyHolder secretKeyHolder,

105

boolean saslEnabled,

106

boolean saslEncryptionEnabled

107

);

108

109

public void registerDriverWithShuffleService(String host, int port) throws IOException;

110

}

111

```

112

113

Specialized client for Mesos deployments that adds driver registration functionality for cleanup purposes.

114

115

#### registerDriverWithShuffleService

116

117

Registers the Spark driver with the external shuffle service for proper cleanup of shuffle files when the application completes.

118

119

**Parameters:**

120

- `host` (String): Shuffle service hostname

121

- `port` (int): Shuffle service port

122

123

**Throws:**

124

- `IOException`: If registration fails

125

126

## Usage Examples

127

128

### Basic Client Setup

129

130

```java

131

// Create configuration

132

TransportConf conf = new TransportConf("shuffle", new SparkConf());

133

134

// Create client without SASL

135

ExternalShuffleClient client = new ExternalShuffleClient(

136

conf, null, false, false

137

);

138

139

// Initialize with app ID

140

client.init("spark-app-123");

141

```

142

143

### Client with SASL Authentication

144

145

```java

146

// Create secret manager

147

ShuffleSecretManager secretManager = new ShuffleSecretManager();

148

secretManager.registerApp("spark-app-123", "my-secret-key");

149

150

// Create secure client

151

ExternalShuffleClient client = new ExternalShuffleClient(

152

conf,

153

secretManager, // Secret key holder

154

true, // Enable SASL

155

true // Enable SASL encryption

156

);

157

158

client.init("spark-app-123");

159

```

160

161

### Executor Registration

162

163

```java

164

// Define executor shuffle configuration

165

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

166

new String[]{"/tmp/spark-shuffle", "/tmp/spark-shuffle2"}, // Local directories

167

64, // Number of subdirectories per local directory

168

"org.apache.spark.shuffle.sort.SortShuffleManager" // Shuffle manager class

169

);

170

171

// Register with shuffle server

172

client.registerWithShuffleServer("shuffle-server", 7337, "executor-1", executorInfo);

173

```

174

175

### Block Fetching with Callback

176

177

```java

178

// Implement fetch callback

179

BlockFetchingListener listener = new BlockFetchingListener() {

180

@Override

181

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

182

System.out.println("Fetched block " + blockId + ", size: " + data.size());

183

// Process block data

184

try {

185

byte[] blockData = new byte[(int) data.size()];

186

data.nioByteBuffer().get(blockData);

187

// Handle the shuffle block data...

188

} finally {

189

data.release(); // Important: release buffer when done

190

}

191

}

192

193

@Override

194

public void onBlockFetchFailure(String blockId, Throwable exception) {

195

System.err.println("Failed to fetch " + blockId + ": " + exception.getMessage());

196

// Handle failure, perhaps retry or skip

197

}

198

};

199

200

// Fetch multiple blocks

201

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};

202

client.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);

203

```

204

205

### Mesos-Specific Usage

206

207

```java

208

// Create Mesos client

209

MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(

210

conf, secretManager, true, false

211

);

212

213

mesosClient.init("spark-app-123");

214

215

// Register driver for cleanup

216

mesosClient.registerDriverWithShuffleService("shuffle-server", 7337);

217

218

// Use normally for block fetching

219

mesosClient.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);

220

```

221

222

## Error Handling

223

224

The shuffle client implements automatic retry logic for transient network failures. Configure retry behavior through `TransportConf`:

225

226

```java

227

// Set maximum retry attempts

228

conf.set("spark.shuffle.io.maxRetries", "3");

229

230

// Set retry wait time

231

conf.set("spark.shuffle.io.retryWait", "5s");

232

```

233

234

Common exceptions:

235

- `IOException`: Network connectivity issues, server unavailable

236

- `IllegalArgumentException`: Invalid parameters or configuration

237

- `SecurityException`: SASL authentication failures

238

239

## Resource Management

240

241

Always close the shuffle client to free network resources:

242

243

```java

244

try {

245

// Use client...

246

} finally {

247

client.close();

248

}