or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdbuffers.mdconfiguration.mdindex.mdprotocol.mdstreaming.mdtransport.md

index.mddocs/

0

# Apache Spark Network-Common

1

2

Apache Spark Network-Common provides the foundational networking layer for Apache Spark's distributed computing engine. It implements a high-performance transport abstraction built on Netty, offering comprehensive client-server communication capabilities including RPC handling, data streaming, authentication (including SASL and custom auth protocols), encryption, and connection management.

3

4

## Package Information

5

6

- **Package Name**: spark-network-common_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-network-common_2.11

11

- **Installation**: `<dependency><groupId>org.apache.spark</groupId><artifactId>spark-network-common_2.11</artifactId><version>2.4.8</version></dependency>`

12

13

## Core Imports

14

15

```java

16

import org.apache.spark.network.TransportContext;

17

import org.apache.spark.network.client.TransportClient;

18

import org.apache.spark.network.client.TransportClientFactory;

19

import org.apache.spark.network.server.TransportServer;

20

import org.apache.spark.network.server.RpcHandler;

21

import org.apache.spark.network.util.TransportConf;

22

```

23

24

## Basic Usage

25

26

```java

27

import org.apache.spark.network.TransportContext;

28

import org.apache.spark.network.client.TransportClient;

29

import org.apache.spark.network.client.TransportClientFactory;

30

import org.apache.spark.network.server.TransportServer;

31

import org.apache.spark.network.server.RpcHandler;

32

import org.apache.spark.network.util.TransportConf;

33

import org.apache.spark.network.util.MapConfigProvider;

34

35

// Configure transport

36

TransportConf conf = new TransportConf("myapp", new MapConfigProvider(configMap));

37

38

// Create RPC handler

39

RpcHandler rpcHandler = new MyRpcHandler();

40

41

// Set up transport context

42

TransportContext context = new TransportContext(conf, rpcHandler);

43

44

// Create server

45

TransportServer server = context.createServer(0); // 0 = any available port

46

int port = server.getPort();

47

48

// Create client factory and client

49

TransportClientFactory clientFactory = context.createClientFactory();

50

TransportClient client = clientFactory.createClient("localhost", port);

51

52

// Send RPC

53

ByteBuffer message = ByteBuffer.wrap("Hello".getBytes());

54

client.sendRpc(message, new RpcResponseCallback() {

55

@Override

56

public void onSuccess(ByteBuffer response) {

57

System.out.println("Response received: " + new String(response.array()));

58

}

59

60

@Override

61

public void onFailure(Throwable e) {

62

System.err.println("RPC failed: " + e.getMessage());

63

}

64

});

65

66

// Cleanup

67

client.close();

68

server.close();

69

clientFactory.close();

70

```

71

72

## Architecture

73

74

Apache Spark Network-Common is built around several key components:

75

76

- **Transport Context**: Central factory (`TransportContext`) for creating servers and client factories with consistent configuration

77

- **Client-Server Model**: Asynchronous client (`TransportClient`) and server (`TransportServer`) implementations with connection pooling

78

- **RPC Framework**: Pluggable RPC handlers with support for bidirectional communication and one-way messages

79

- **Stream Management**: Efficient streaming data transfer with chunk-based fetching and zero-copy I/O

80

- **Authentication Layer**: Pluggable authentication via SASL and custom protocols with encryption support

81

- **Buffer Abstraction**: Unified buffer interface (`ManagedBuffer`) with multiple backing implementations (file, memory, Netty)

82

- **Protocol Stack**: Complete message protocol with encoding/decoding and frame-based transport

83

84

## Capabilities

85

86

### Transport Layer

87

88

Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and resource management.

89

90

```java { .api }

91

public class TransportContext {

92

public TransportContext(TransportConf conf, RpcHandler rpcHandler);

93

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);

94

public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);

95

public TransportClientFactory createClientFactory();

96

public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);

97

public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);

98

public TransportServer createServer(List<TransportServerBootstrap> bootstraps);

99

public TransportServer createServer();

100

}

101

102

public class TransportClient {

103

public Channel getChannel();

104

public boolean isActive();

105

public SocketAddress getSocketAddress();

106

public String getClientId();

107

public void setClientId(String id);

108

public long sendRpc(ByteBuffer message, RpcResponseCallback callback);

109

public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);

110

public void send(ByteBuffer message);

111

public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);

112

public void stream(String streamId, StreamCallback callback);

113

public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);

114

public void removeRpcRequest(long requestId);

115

public void timeOut();

116

public void close();

117

}

118

119

public class TransportServer {

120

public int getPort();

121

public void close();

122

}

123

```

124

125

[Transport Layer](./transport.md)

126

127

### Buffer Management

128

129

Unified buffer abstraction with multiple backing implementations for efficient memory and file-based data handling.

130

131

```java { .api }

132

public abstract class ManagedBuffer {

133

public abstract long size();

134

public abstract ByteBuffer nioByteBuffer() throws IOException;

135

public abstract InputStream createInputStream() throws IOException;

136

public abstract ManagedBuffer retain();

137

public abstract ManagedBuffer release();

138

public abstract Object convertToNetty() throws IOException;

139

}

140

141

public final class FileSegmentManagedBuffer extends ManagedBuffer {

142

public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);

143

public File getFile();

144

public long getOffset();

145

public long getLength();

146

}

147

```

148

149

[Buffer Management](./buffers.md)

150

151

### Protocol Handling

152

153

Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication.

154

155

```java { .api }

156

public interface Message extends Encodable {

157

}

158

159

public interface Encodable {

160

int encodedLength();

161

void encode(ByteBuf buf);

162

}

163

164

public final class RpcRequest extends AbstractMessage implements RequestMessage {

165

public RpcRequest(long requestId, ManagedBuffer message);

166

}

167

168

public final class RpcResponse extends AbstractResponseMessage implements ResponseMessage {

169

public RpcResponse(long requestId, ManagedBuffer message);

170

}

171

```

172

173

[Protocol Handling](./protocol.md)

174

175

### Authentication

176

177

Pluggable authentication system supporting SASL and custom authentication protocols with encryption.

178

179

```java { .api }

180

public class SaslClientBootstrap implements TransportClientBootstrap {

181

public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

182

}

183

184

public class SaslServerBootstrap implements TransportServerBootstrap {

185

public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

186

}

187

188

public interface SecretKeyHolder {

189

String getSaslUser(String appId);

190

String getSecretKey(String appId);

191

}

192

```

193

194

[Authentication](./authentication.md)

195

196

### Stream Management

197

198

Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead.

199

200

```java { .api }

201

public abstract class StreamManager {

202

public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

203

public abstract ManagedBuffer openStream(String streamId);

204

public void connectionTerminated(Channel channel);

205

public void checkAuthorization(TransportClient client, long streamId);

206

}

207

208

public class OneForOneStreamManager extends StreamManager {

209

public OneForOneStreamManager();

210

public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);

211

}

212

```

213

214

[Stream Management](./streaming.md)

215

216

### Configuration

217

218

Comprehensive configuration system with performance tuning options for connection management, I/O settings, and security parameters.

219

220

```java { .api }

221

public class TransportConf {

222

public TransportConf(String module, ConfigProvider conf);

223

public int connectionTimeoutMs();

224

public int numConnectionsPerPeer();

225

public int serverThreads();

226

public int clientThreads();

227

public int receiveBuf();

228

public int sendBuf();

229

public boolean encryptionEnabled();

230

public String cipherTransformation();

231

}

232

```

233

234

[Configuration and Utilities](./configuration.md)

235

236

## Types

237

238

```java { .api }

239

// Core callback interfaces

240

public interface RpcResponseCallback {

241

void onSuccess(ByteBuffer response);

242

void onFailure(Throwable e);

243

}

244

245

public interface ChunkReceivedCallback {

246

void onSuccess(int chunkIndex, ManagedBuffer buffer);

247

void onFailure(int chunkIndex, Throwable e);

248

}

249

250

public interface StreamCallback {

251

void onData(String streamId, ByteBuffer buf) throws IOException;

252

void onComplete(String streamId) throws IOException;

253

void onFailure(String streamId, Throwable cause) throws IOException;

254

}

255

256

// Bootstrap interfaces

257

public interface TransportClientBootstrap {

258

void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

259

}

260

261

public interface TransportServerBootstrap {

262

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

263

}

264

265

// Configuration provider

266

public abstract class ConfigProvider {

267

public abstract String get(String name);

268

}

269

270

public class MapConfigProvider extends ConfigProvider {

271

public MapConfigProvider(Map<String, String> props);

272

}

273

```