or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-network-shuffle_2-10

External shuffle service client for Apache Spark that enables reading shuffle blocks from external servers instead of executors

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-shuffle_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-10@1.6.0

0

# Apache Spark Network Shuffle

1

2

Apache Spark Network Shuffle provides external shuffle service functionality that enables reading shuffle blocks from external servers instead of directly from executors. This improves fault tolerance by allowing shuffle data to persist even when executors are lost, making Spark applications more reliable in distributed computing environments.

3

4

## Package Information

5

6

- **Package Name**: spark-network-shuffle_2.10

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Group ID**: org.apache.spark

10

- **Installation**: Add dependency to your Maven pom.xml:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-network-shuffle_2.10</artifactId>

16

<version>1.6.3</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import org.apache.spark.network.shuffle.ExternalShuffleClient;

24

import org.apache.spark.network.shuffle.BlockFetchingListener;

25

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

26

import org.apache.spark.network.sasl.ShuffleSecretManager;

27

import org.apache.spark.network.util.TransportConf;

28

import org.apache.spark.network.util.ConfigProvider;

29

```

30

31

For server-side components:

32

33

```java

34

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

35

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;

36

```

37

38

## Basic Usage

39

40

```java

41

import org.apache.spark.network.shuffle.ExternalShuffleClient;

42

import org.apache.spark.network.shuffle.BlockFetchingListener;

43

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

44

import org.apache.spark.network.buffer.ManagedBuffer;

45

import org.apache.spark.network.util.TransportConf;

46

import org.apache.spark.network.util.ConfigProvider;

47

48

// Create simple configuration provider with defaults

49

ConfigProvider configProvider = new ConfigProvider() {

50

@Override

51

public String get(String name) {

52

// Return default values for shuffle configuration

53

if (name.equals("spark.shuffle.io.maxRetries")) return "3";

54

if (name.equals("spark.shuffle.io.retryWait")) return "5s";

55

throw new java.util.NoSuchElementException(name);

56

}

57

};

58

59

// Create transport configuration

60

TransportConf conf = new TransportConf("shuffle", configProvider);

61

62

// Create client with SASL disabled for simplicity

63

ExternalShuffleClient client = new ExternalShuffleClient(

64

conf,

65

null, // secretKeyHolder - null for no SASL

66

false, // saslEnabled

67

false // saslEncryptionEnabled

68

);

69

70

// Initialize client

71

client.init("my-spark-app");

72

73

// Register executor with shuffle server

74

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

75

new String[]{"/tmp/spark-shuffle"}, // local directories

76

64, // subdirs per local dir

77

"org.apache.spark.shuffle.sort.SortShuffleManager"

78

);

79

80

client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);

81

82

// Implement callback for block fetching

83

BlockFetchingListener listener = new BlockFetchingListener() {

84

@Override

85

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

86

System.out.println("Successfully fetched block: " + blockId);

87

// Process the data...

88

}

89

90

@Override

91

public void onBlockFetchFailure(String blockId, Throwable exception) {

92

System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());

93

}

94

};

95

96

// Fetch shuffle blocks

97

String[] blockIds = {"shuffle_1_2_0", "shuffle_1_2_1"};

98

client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener);

99

100

// Close client when done

101

client.close();

102

```

103

104

## Architecture

105

106

The Apache Spark Network Shuffle library is built around several key components:

107

108

- **Client Components**: `ExternalShuffleClient` and `MesosExternalShuffleClient` provide the primary interface for fetching shuffle data from external services

109

- **Server Components**: `ExternalShuffleBlockHandler` and `ExternalShuffleBlockResolver` implement the server-side logic for serving shuffle blocks

110

- **Protocol Layer**: Network protocol messages (`BlockTransferMessage` subclasses) handle communication between clients and servers

111

- **Security Layer**: `ShuffleSecretManager` provides SASL-based authentication for secure shuffle data access

112

- **Retry Mechanisms**: `RetryingBlockFetcher` and `OneForOneBlockFetcher` implement fault-tolerant block fetching with configurable retry logic

113

114

## Capabilities

115

116

### Shuffle Client Operations

117

118

Primary client interface for fetching shuffle blocks from external shuffle services. Supports both basic and Mesos-specific deployments with configurable SASL authentication.

119

120

```java { .api }

121

public abstract class ShuffleClient implements Closeable {

122

public void init(String appId);

123

public abstract void fetchBlocks(

124

String host,

125

int port,

126

String execId,

127

String[] blockIds,

128

BlockFetchingListener listener

129

);

130

}

131

132

public class ExternalShuffleClient extends ShuffleClient {

133

public ExternalShuffleClient(

134

TransportConf conf,

135

SecretKeyHolder secretKeyHolder,

136

boolean saslEnabled,

137

boolean saslEncryptionEnabled

138

);

139

140

public void registerWithShuffleServer(

141

String host,

142

int port,

143

String execId,

144

ExecutorShuffleInfo executorInfo

145

) throws IOException;

146

}

147

```

148

149

[Shuffle Client](./shuffle-client.md)

150

151

### Server-Side Block Management

152

153

Server-side components that handle shuffle block requests, manage executor registrations, and resolve block locations on the filesystem.

154

155

```java { .api }

156

public class ExternalShuffleBlockHandler extends RpcHandler {

157

public ExternalShuffleBlockHandler(

158

TransportConf conf,

159

File registeredExecutorFile

160

) throws IOException;

161

162

public void receive(

163

TransportClient client,

164

ByteBuffer message,

165

RpcResponseCallback callback

166

);

167

}

168

169

public class ExternalShuffleBlockResolver {

170

public ExternalShuffleBlockResolver(

171

TransportConf conf,

172

File registeredExecutorFile

173

) throws IOException;

174

175

public void registerExecutor(

176

String appId,

177

String execId,

178

ExecutorShuffleInfo executorInfo

179

);

180

181

public ManagedBuffer getBlockData(

182

String appId,

183

String execId,

184

String blockId

185

) throws IOException;

186

}

187

```

188

189

[Server Components](./server-components.md)

190

191

### Network Protocol Messages

192

193

Protocol message classes for communication between shuffle clients and servers, including executor registration, block requests, and data transfer.

194

195

```java { .api }

196

public abstract class BlockTransferMessage implements Encodable {

197

public ByteBuffer toByteBuffer();

198

199

public static class Decoder {

200

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

201

}

202

}

203

204

public class ExecutorShuffleInfo implements Encodable {

205

public ExecutorShuffleInfo(

206

String[] localDirs,

207

int subDirsPerLocalDir,

208

String shuffleManager

209

);

210

}

211

```

212

213

[Protocol Messages](./protocol-messages.md)

214

215

### Security and Authentication

216

217

SASL-based security mechanisms for authenticating shuffle clients with external shuffle services, including secret management and secure communication.

218

219

```java { .api }

220

public class ShuffleSecretManager implements SecretKeyHolder {

221

public ShuffleSecretManager();

222

223

public void registerApp(String appId, String shuffleSecret);

224

public void unregisterApp(String appId);

225

public String getSecretKey(String appId);

226

}

227

```

228

229

[Security](./security.md)

230

231

## Types

232

233

```java { .api }

234

public interface BlockFetchingListener extends EventListener {

235

void onBlockFetchSuccess(String blockId, ManagedBuffer data);

236

void onBlockFetchFailure(String blockId, Throwable exception);

237

}

238

239

public enum BlockTransferMessage.Type {

240

OPEN_BLOCKS(0),

241

UPLOAD_BLOCK(1),

242

REGISTER_EXECUTOR(2),

243

STREAM_HANDLE(3),

244

REGISTER_DRIVER(4);

245

}

246

```