or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-apis.mdindex.mdprotocol-messages.mdsecurity.mdserver-apis.md

client-apis.mddocs/

0

# Client APIs

1

2

Client-side functionality for fetching shuffle blocks from external shuffle services with comprehensive error handling, retry logic, and fault tolerance mechanisms.

3

4

## Capabilities

5

6

### Shuffle Client Base

7

8

Abstract base class providing the core interface for reading shuffle files from executors or external services.

9

10

```java { .api }

11

/**

12

* Base interface for reading shuffle files, either from an Executor or external service

13

*/

14

public abstract class ShuffleClient implements Closeable {

15

/**

16

* Initialize the client with application ID

17

* @param appId - Spark application identifier

18

*/

19

public void init(String appId);

20

21

/**

22

* Fetch blocks from the specified host and executor

23

* @param host - Target host address

24

* @param port - Target port number

25

* @param execId - Executor identifier

26

* @param blockIds - Array of block identifiers to fetch

27

* @param listener - Callback for handling fetch results

28

*/

29

public abstract void fetchBlocks(String host, int port, String execId,

30

String[] blockIds, BlockFetchingListener listener);

31

32

/**

33

* Close the client and cleanup resources

34

*/

35

public void close() throws IOException;

36

}

37

```

38

39

### External Shuffle Client

40

41

Concrete implementation for reading shuffle blocks from external shuffle services with SASL authentication support.

42

43

```java { .api }

44

/**

45

* Client for reading shuffle blocks from external shuffle service

46

*/

47

public class ExternalShuffleClient extends ShuffleClient {

48

/**

49

* Create external shuffle client with security configuration

50

* @param conf - Transport configuration

51

* @param secretKeyHolder - Secret key holder for authentication

52

* @param saslEnabled - Whether SASL authentication is enabled

53

* @param saslEncryptionEnabled - Whether SASL encryption is enabled

54

*/

55

public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,

56

boolean saslEnabled, boolean saslEncryptionEnabled);

57

58

/**

59

* Initialize client with application ID

60

* @param appId - Spark application identifier

61

*/

62

public void init(String appId);

63

64

/**

65

* Fetch blocks from external shuffle service

66

* @param host - Shuffle service host address

67

* @param port - Shuffle service port number

68

* @param execId - Executor identifier

69

* @param blockIds - Array of block identifiers to fetch

70

* @param listener - Callback for handling fetch results

71

*/

72

public void fetchBlocks(String host, int port, String execId,

73

String[] blockIds, BlockFetchingListener listener);

74

75

/**

76

* Register executor with shuffle server

77

* @param host - Shuffle server host address

78

* @param port - Shuffle server port number

79

* @param execId - Executor identifier

80

* @param executorInfo - Executor configuration information

81

* @throws IOException if registration fails

82

*/

83

public void registerWithShuffleServer(String host, int port, String execId,

84

ExecutorShuffleInfo executorInfo) throws IOException;

85

86

/**

87

* Close client and cleanup resources

88

*/

89

public void close();

90

}

91

```

92

93

**Usage Example:**

94

95

```java

96

import org.apache.spark.network.shuffle.ExternalShuffleClient;

97

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

98

import org.apache.spark.network.util.TransportConf;

99

100

// Create client with SASL disabled

101

TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());

102

ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);

103

104

// Initialize and register executor

105

client.init("spark-app-123");

106

ExecutorShuffleInfo info = new ExecutorShuffleInfo(

107

new String[]{"/tmp/spark-local-1"}, 64, "org.apache.spark.shuffle.sort.SortShuffleManager"

108

);

109

client.registerWithShuffleServer("shuffle-host", 7337, "executor-1", info);

110

111

// Fetch blocks with callback

112

client.fetchBlocks("shuffle-host", 7337, "executor-1",

113

new String[]{"shuffle_0_0_0", "shuffle_0_0_1"}, listener);

114

```

115

116

### Block Fetching Listener

117

118

Event listener interface for handling block fetch operations with success and failure callbacks.

119

120

```java { .api }

121

/**

122

* Event listener for block fetch operations

123

*/

124

public interface BlockFetchingListener extends EventListener {

125

/**

126

* Called when a block is successfully fetched

127

* @param blockId - Identifier of the fetched block

128

* @param data - Block data as managed buffer

129

*/

130

void onBlockFetchSuccess(String blockId, ManagedBuffer data);

131

132

/**

133

* Called when block fetch fails

134

* @param blockId - Identifier of the failed block

135

* @param exception - Exception that caused the failure

136

*/

137

void onBlockFetchFailure(String blockId, Throwable exception);

138

}

139

```

140

141

**Usage Example:**

142

143

```java

144

BlockFetchingListener listener = new BlockFetchingListener() {

145

@Override

146

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

147

try {

148

// Process the block data

149

ByteBuffer buffer = data.nioByteBuffer();

150

System.out.println("Fetched block " + blockId + " with " + buffer.remaining() + " bytes");

151

// Handle the shuffle data...

152

} finally {

153

data.release(); // Important: release the buffer

154

}

155

}

156

157

@Override

158

public void onBlockFetchFailure(String blockId, Throwable exception) {

159

System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());

160

// Implement retry logic or error handling

161

}

162

};

163

```

164

165

### One-for-One Block Fetcher

166

167

Simple wrapper on TransportClient that interprets each chunk as a whole block for straightforward block fetching.

168

169

```java { .api }

170

/**

171

* Simple wrapper interpreting each chunk as a whole block

172

*/

173

public class OneForOneBlockFetcher {

174

/**

175

* Create block fetcher for specified blocks

176

* @param client - Transport client for network communication

177

* @param appId - Spark application identifier

178

* @param execId - Executor identifier

179

* @param blockIds - Array of block identifiers to fetch

180

* @param listener - Callback for handling fetch results

181

*/

182

public OneForOneBlockFetcher(TransportClient client, String appId, String execId,

183

String[] blockIds, BlockFetchingListener listener);

184

185

/**

186

* Start fetching the specified blocks

187

*/

188

public void start();

189

}

190

```

191

192

### Retrying Block Fetcher

193

194

Wrapper that adds automatic retry capability to block fetching operations with configurable retry policies.

195

196

```java { .api }

197

/**

198

* Wraps another BlockFetcher with automatic retry capability

199

*/

200

public class RetryingBlockFetcher {

201

/**

202

* Create retrying block fetcher with specified configuration

203

* @param conf - Transport configuration including retry settings

204

* @param fetchStarter - Factory for creating block fetchers

205

* @param blockIds - Array of block identifiers to fetch

206

* @param listener - Callback for handling fetch results

207

*/

208

public RetryingBlockFetcher(TransportConf conf, BlockFetchStarter fetchStarter,

209

String[] blockIds, BlockFetchingListener listener);

210

211

/**

212

* Start fetching blocks with retry logic

213

*/

214

public void start();

215

}

216

217

/**

218

* Interface for creating and starting block fetchers

219

*/

220

public static interface BlockFetchStarter {

221

/**

222

* Create and start block fetcher for specified blocks

223

* @param blockIds - Array of block identifiers to fetch

224

* @param listener - Callback for handling fetch results

225

* @throws IOException if fetcher creation fails

226

*/

227

void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;

228

}

229

```

230

231

**Usage Example:**

232

233

```java

234

// Create a retry-enabled block fetcher

235

BlockFetchStarter fetchStarter = (blockIds, listener) -> {

236

TransportClient client = transportContext.createClient(host, port);

237

OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client, appId, execId, blockIds, listener);

238

fetcher.start();

239

};

240

241

RetryingBlockFetcher retryingFetcher = new RetryingBlockFetcher(

242

conf, fetchStarter, blockIds, originalListener

243

);

244

retryingFetcher.start();

245

```

246

247

## Mesos Integration

248

249

### Mesos External Shuffle Client

250

251

Extended client for external shuffle service in Mesos coarse-grained mode with driver registration support.

252

253

```java { .api }

254

/**

255

* Client for external shuffle service in Mesos coarse-grained mode

256

*/

257

public class MesosExternalShuffleClient extends ExternalShuffleClient {

258

/**

259

* Create Mesos external shuffle client

260

* @param conf - Transport configuration

261

* @param secretKeyHolder - Secret key holder for authentication

262

* @param saslEnabled - Whether SASL authentication is enabled

263

* @param saslEncryptionEnabled - Whether SASL encryption is enabled

264

*/

265

public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,

266

boolean saslEnabled, boolean saslEncryptionEnabled);

267

268

/**

269

* Register driver with shuffle service for Mesos integration

270

* @param host - Shuffle service host address

271

* @param port - Shuffle service port number

272

* @throws IOException if registration fails

273

*/

274

public void registerDriverWithShuffleService(String host, int port) throws IOException;

275

}

276

```