or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-network-shuffle

External shuffle service for Apache Spark that enables shuffle operations outside of executor processes

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-shuffle_2.11@2.2.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle@2.2.0

0

# Spark Network Shuffle

1

2

Spark Network Shuffle provides an external shuffle service for Apache Spark that enables shuffle operations to be performed outside of executor processes. This improves resource utilization and fault tolerance by allowing executors to store and retrieve shuffle data through a dedicated service.

3

4

## Package Information

5

6

- **Package Name**: spark-network-shuffle_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add Maven dependency: `org.apache.spark:spark-network-shuffle_2.11:2.2.3`

10

11

## Core Imports

12

13

```java

14

import org.apache.spark.network.shuffle.ExternalShuffleClient;

15

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

16

import org.apache.spark.network.shuffle.BlockFetchingListener;

17

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

18

import org.apache.spark.network.sasl.ShuffleSecretManager;

19

```

20

21

## Basic Usage

22

23

```java

24

import org.apache.spark.network.shuffle.ExternalShuffleClient;

25

import org.apache.spark.network.shuffle.BlockFetchingListener;

26

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

27

import org.apache.spark.network.util.TransportConf;

28

import org.apache.spark.network.buffer.ManagedBuffer;

29

30

// Create and initialize client

31

TransportConf conf = new TransportConf("shuffle");

32

ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);

33

client.init("myApp");

34

35

// Register executor with shuffle service

36

String[] localDirs = {"/tmp/spark-shuffle"};

37

ExecutorShuffleInfo info = new ExecutorShuffleInfo(

38

localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");

39

client.registerWithShuffleServer("localhost", 7337, "executor-1", info);

40

41

// Fetch blocks asynchronously

42

String[] blockIds = {"shuffle_0_1_0", "shuffle_0_2_0"};

43

BlockFetchingListener listener = new BlockFetchingListener() {

44

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

45

System.out.println("Fetched block: " + blockId);

46

// Process block data

47

}

48

49

public void onBlockFetchFailure(String blockId, Throwable exception) {

50

System.err.println("Failed to fetch block: " + blockId);

51

}

52

};

53

54

client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);

55

client.close();

56

```

57

58

## Architecture

59

60

Spark Network Shuffle is built around several key components:

61

62

- **Shuffle Client**: Client-side interface for fetching shuffle blocks from external services

63

- **Block Handler**: Server-side RPC handler that processes shuffle requests and manages block access

64

- **Block Resolver**: Manages executor metadata and resolves shuffle block locations on disk

65

- **Protocol Messages**: Structured messages for client-server communication using Netty encoding

66

- **SASL Security**: Authentication system for secure shuffle operations

67

- **Mesos Integration**: Specialized components for Mesos cluster manager integration

68

69

## Capabilities

70

71

### Shuffle Client Operations

72

73

Core client functionality for connecting to external shuffle services and fetching shuffle blocks with retry logic and authentication support.

74

75

```java { .api }

76

public abstract class ShuffleClient implements Closeable {

77

public void init(String appId);

78

public abstract void fetchBlocks(

79

String host, int port, String execId, String[] blockIds,

80

BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);

81

}

82

83

public class ExternalShuffleClient extends ShuffleClient {

84

public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled);

85

public void registerWithShuffleServer(String host, int port, String execId, ExecutorShuffleInfo executorInfo)

86

throws IOException, InterruptedException;

87

}

88

```

89

90

[Shuffle Client](./client.md)

91

92

### Shuffle Service Handler

93

94

Server-side components for handling shuffle requests, managing registered executors, and streaming shuffle blocks to clients.

95

96

```java { .api }

97

public class ExternalShuffleBlockHandler extends RpcHandler {

98

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;

99

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

100

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

101

public MetricSet getAllMetrics();

102

}

103

```

104

105

[Service Handler](./handler.md)

106

107

### Block Resolution

108

109

Block resolver functionality for managing executor metadata, locating shuffle files on disk, and providing shuffle block data access.

110

111

```java { .api }

112

public class ExternalShuffleBlockResolver {

113

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;

114

public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

115

public ManagedBuffer getBlockData(String appId, String execId, String blockId);

116

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

117

}

118

```

119

120

[Block Resolution](./resolver.md)

121

122

### Protocol Messages

123

124

Structured protocol messages for client-server communication, including executor registration, block requests, and streaming handles.

125

126

```java { .api }

127

public abstract class BlockTransferMessage implements Encodable {

128

protected abstract Type type();

129

public ByteBuffer toByteBuffer();

130

131

public static class Decoder {

132

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

133

}

134

}

135

136

public class ExecutorShuffleInfo implements Encodable {

137

public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);

138

public final String[] localDirs;

139

public final int subDirsPerLocalDir;

140

public final String shuffleManager;

141

}

142

```

143

144

[Protocol Messages](./protocol.md)

145

146

### Security and Authentication

147

148

SASL-based authentication system for secure shuffle operations, managing application secrets and authenticating client connections.

149

150

```java { .api }

151

public class ShuffleSecretManager implements SecretKeyHolder {

152

public ShuffleSecretManager();

153

public void registerApp(String appId, String shuffleSecret);

154

public void unregisterApp(String appId);

155

public String getSecretKey(String appId);

156

}

157

```

158

159

[Security](./security.md)

160

161

### Mesos Integration

162

163

Specialized components for Mesos cluster manager integration, including driver registration and heartbeat mechanisms.

164

165

```java { .api }

166

public class MesosExternalShuffleClient extends ExternalShuffleClient {

167

public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,

168

boolean authEnabled);

169

public void registerDriverWithShuffleService(String host, int port,

170

long heartbeatTimeoutMs, long heartbeatIntervalMs) throws IOException, InterruptedException;

171

}

172

```

173

174

[Mesos Integration](./mesos.md)

175

176

## Types

177

178

### Core Types

179

180

```java { .api }

181

public interface BlockFetchingListener extends EventListener {

182

void onBlockFetchSuccess(String blockId, ManagedBuffer data);

183

void onBlockFetchFailure(String blockId, Throwable exception);

184

}

185

186

public interface TempShuffleFileManager {

187

File createTempShuffleFile();

188

boolean registerTempShuffleFileToClean(File file);

189

}

190

191

public class ShuffleIndexRecord {

192

public ShuffleIndexRecord(long offset, long length);

193

public long getOffset();

194

public long getLength();

195

}

196

```

197

198

### Exception Types

199

200

```java { .api }

201

// Common exceptions thrown by shuffle operations

202

java.io.IOException // File I/O operations, network connectivity

203

java.lang.InterruptedException // Blocking operations that can be interrupted

204

java.lang.SecurityException // Authentication failures

205

java.lang.IllegalArgumentException // Invalid block IDs or parameters

206

java.lang.UnsupportedOperationException // Unsupported message types

207

java.lang.RuntimeException // Executor not registered or runtime errors

208

```