or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

index.mddocs/

0

# Apache Spark Network Shuffle

1

2

Apache Spark Network Shuffle is a Java library that provides network-based shuffle functionality for Apache Spark's distributed computing framework. It enables efficient data exchange between Spark executors during shuffle operations through external shuffle services, supporting secure authentication, retry mechanisms, and comprehensive metrics collection.

3

4

## Package Information

5

6

- **Package Name**: spark-network-shuffle_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Maven dependency `org.apache.spark:spark-network-shuffle_2.11:2.4.8`

10

11

## Core Imports

12

13

```java

14

import org.apache.spark.network.shuffle.ShuffleClient;

15

import org.apache.spark.network.shuffle.ExternalShuffleClient;

16

import org.apache.spark.network.shuffle.BlockFetchingListener;

17

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

18

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

19

import org.apache.spark.network.sasl.ShuffleSecretManager;

20

```

21

22

## Basic Usage

23

24

```java

25

import org.apache.spark.network.shuffle.ExternalShuffleClient;

26

import org.apache.spark.network.shuffle.BlockFetchingListener;

27

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

28

import org.apache.spark.network.sasl.ShuffleSecretManager;

29

import org.apache.spark.network.util.TransportConf;

30

import org.apache.spark.network.buffer.ManagedBuffer;

31

32

// Create shuffle secret manager for authentication

33

ShuffleSecretManager secretManager = new ShuffleSecretManager();

34

secretManager.registerApp("myApp", "mySecretKey");

35

36

// Create external shuffle client

37

TransportConf conf = new TransportConf("shuffle");

38

ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 5000);

39

40

// Initialize client

41

client.init("myApp");

42

43

// Register executor with shuffle service

44

String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};

45

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");

46

client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);

47

48

// Fetch blocks with listener

49

BlockFetchingListener listener = new BlockFetchingListener() {

50

@Override

51

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

52

System.out.println("Successfully fetched block: " + blockId);

53

}

54

55

@Override

56

public void onBlockFetchFailure(String blockId, Throwable exception) {

57

System.err.println("Failed to fetch block: " + blockId);

58

}

59

};

60

61

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};

62

client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);

63

64

// Close client when done

65

client.close();

66

```

67

68

## Architecture

69

70

Apache Spark Network Shuffle is built around several key components:

71

72

- **Shuffle Clients**: Client-side components for fetching shuffle blocks from external services

73

- **Shuffle Handlers**: Server-side RPC handlers that serve shuffle blocks to clients

74

- **Block Resolution**: Components that convert shuffle block IDs to physical file segments

75

- **Security Management**: SASL-based authentication system for secure shuffle operations

76

- **Protocol Messages**: Serializable message classes for client-server communication

77

- **File Management**: Temporary file handling during block transfer operations

78

- **Retry Mechanisms**: Automatic retry functionality for handling transient failures

79

80

## Capabilities

81

82

### Shuffle Client Operations

83

84

Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.

85

86

```java { .api }

87

public abstract class ShuffleClient implements Closeable {

88

public void init(String appId);

89

public abstract void fetchBlocks(

90

String host, int port, String execId, String[] blockIds,

91

BlockFetchingListener listener, DownloadFileManager downloadFileManager

92

);

93

public MetricSet shuffleMetrics();

94

public void close();

95

}

96

97

public class ExternalShuffleClient extends ShuffleClient {

98

public ExternalShuffleClient(

99

TransportConf conf, SecretKeyHolder secretKeyHolder,

100

boolean authEnabled, long registrationTimeoutMs

101

);

102

public void registerWithShuffleServer(

103

String host, int port, String execId, ExecutorShuffleInfo executorInfo

104

) throws IOException, InterruptedException;

105

}

106

```

107

108

[Shuffle Client Operations](./shuffle-client.md)

109

110

### Shuffle Server Components

111

112

Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.

113

114

```java { .api }

115

public class ExternalShuffleBlockHandler extends RpcHandler {

116

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);

117

public void receive(

118

TransportClient client, ByteBuffer message, RpcResponseCallback callback

119

);

120

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

121

public void executorRemoved(String executorId, String appId);

122

}

123

124

public class ExternalShuffleBlockResolver {

125

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);

126

public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

127

public ManagedBuffer getBlockData(

128

String appId, String execId, int shuffleId, int mapId, int reduceId

129

);

130

}

131

```

132

133

[Shuffle Server Components](./shuffle-server.md)

134

135

### Authentication and Security

136

137

SASL-based authentication system for securing shuffle operations between clients and external shuffle services.

138

139

```java { .api }

140

public class ShuffleSecretManager implements SecretKeyHolder {

141

private static final String SPARK_SASL_USER = "sparkSaslUser";

142

143

public ShuffleSecretManager();

144

public void registerApp(String appId, String shuffleSecret);

145

public void registerApp(String appId, ByteBuffer shuffleSecret);

146

public void unregisterApp(String appId);

147

public String getSaslUser(String appId);

148

public String getSecretKey(String appId);

149

}

150

```

151

152

[Authentication and Security](./authentication.md)

153

154

### Protocol Messages

155

156

Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.

157

158

```java { .api }

159

public abstract class BlockTransferMessage implements Encodable {

160

public ByteBuffer toByteBuffer();

161

162

public enum Type {

163

OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,

164

REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM

165

}

166

167

public static class Decoder {

168

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

169

}

170

}

171

172

public class ExecutorShuffleInfo implements Encodable {

173

public final String[] localDirs;

174

public final int subDirsPerLocalDir;

175

public final String shuffleManager;

176

177

public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);

178

}

179

```

180

181

[Protocol Messages](./protocol.md)

182

183

### Block Fetching and Retry Logic

184

185

Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.

186

187

```java { .api }

188

public interface BlockFetchingListener extends EventListener {

189

void onBlockFetchSuccess(String blockId, ManagedBuffer data);

190

void onBlockFetchFailure(String blockId, Throwable exception);

191

}

192

193

public class OneForOneBlockFetcher {

194

public OneForOneBlockFetcher(

195

TransportClient client, String appId, String execId, String[] blockIds,

196

BlockFetchingListener listener, TransportConf transportConf

197

);

198

public void start();

199

}

200

201

public class RetryingBlockFetcher {

202

public RetryingBlockFetcher(

203

TransportConf conf, BlockFetchStarter fetchStarter,

204

String[] blockIds, BlockFetchingListener listener

205

);

206

public void start();

207

208

public interface BlockFetchStarter {

209

void createAndStart(String[] blockIds, BlockFetchingListener listener);

210

}

211

}

212

```

213

214

[Block Fetching and Retry Logic](./block-fetching.md)

215

216

### File Management

217

218

Temporary file management system for handling downloaded blocks during transfer operations.

219

220

```java { .api }

221

public interface DownloadFile {

222

boolean delete();

223

DownloadFileWritableChannel openForWriting() throws IOException;

224

String path();

225

}

226

227

public interface DownloadFileManager {

228

DownloadFile createTempFile(TransportConf transportConf);

229

boolean registerTempFileToClean(DownloadFile file);

230

}

231

232

public interface DownloadFileWritableChannel extends WritableByteChannel {

233

ManagedBuffer closeAndRead();

234

}

235

```

236

237

[File Management](./file-management.md)

238

239

### Mesos Integration

240

241

Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.

242

243

```java { .api }

244

public class MesosExternalShuffleClient extends ExternalShuffleClient {

245

public MesosExternalShuffleClient(

246

TransportConf conf, SecretKeyHolder secretKeyHolder,

247

boolean authEnabled, long registrationTimeoutMs

248

);

249

public void registerDriverWithShuffleService(

250

String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs

251

) throws IOException, InterruptedException;

252

}

253

```

254

255

[Mesos Integration](./mesos.md)

256

257

## Types

258

259

```java { .api }

260

public class ShuffleIndexRecord {

261

public ShuffleIndexRecord(long offset, long length);

262

public long getOffset();

263

public long getLength();

264

}

265

266

public class ShuffleIndexInformation {

267

public ShuffleIndexInformation(File indexFile);

268

public int getSize();

269

public ShuffleIndexRecord getIndex(int reduceId);

270

}

271

272

public class SimpleDownloadFile implements DownloadFile {

273

public SimpleDownloadFile(File file, TransportConf transportConf);

274

public boolean delete();

275

public DownloadFileWritableChannel openForWriting();

276

public String path();

277

}

278

```