or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-common_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-common-2-12@3.5.0

0

# Apache Spark Network Common

1

2

Apache Spark Network Common provides the core networking infrastructure for Apache Spark, implementing a high-performance transport layer built on Netty for efficient inter-node communication in distributed Spark clusters. It includes comprehensive networking components such as transport contexts for managing network connections, buffer management for zero-copy operations, client-server communication protocols, cryptographic support with forward-secure authentication protocols, SASL-based authentication mechanisms, and specialized shuffle database functionality.

3

4

## Package Information

5

6

- **Package Name**: org.apache.spark:spark-network-common_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-network-common_2.12</artifactId>

14

<version>3.5.6</version>

15

</dependency>

16

```

17

- **Gradle**: `implementation 'org.apache.spark:spark-network-common_2.12:3.5.6'`

18

19

## Core Imports

20

21

```java

22

import org.apache.spark.network.TransportContext;

23

import org.apache.spark.network.client.TransportClient;

24

import org.apache.spark.network.client.TransportClientFactory;

25

import org.apache.spark.network.server.TransportServer;

26

import org.apache.spark.network.server.RpcHandler;

27

import org.apache.spark.network.util.TransportConf;

28

import org.apache.spark.network.util.ConfigProvider;

29

import org.apache.spark.network.util.MapConfigProvider;

30

```

31

32

## Basic Usage

33

34

```java

35

import org.apache.spark.network.TransportContext;

36

import org.apache.spark.network.client.TransportClient;

37

import org.apache.spark.network.client.TransportClientFactory;

38

import org.apache.spark.network.server.TransportServer;

39

import org.apache.spark.network.server.NoOpRpcHandler;

40

import org.apache.spark.network.util.TransportConf;

41

import org.apache.spark.network.util.MapConfigProvider;

42

43

// Configure transport layer

44

Map<String, String> config = new HashMap<>();

45

config.put("spark.network.timeout", "120s");

46

ConfigProvider provider = new MapConfigProvider(config);

47

TransportConf conf = new TransportConf("test", provider);

48

49

// Create transport context (main entry point)

50

TransportContext context = new TransportContext(conf, new NoOpRpcHandler());

51

52

// Create and start server

53

TransportServer server = context.createServer();

54

int port = server.getPort();

55

56

// Create client factory and connect to server

57

TransportClientFactory clientFactory = context.createClientFactory();

58

TransportClient client = clientFactory.createClient("localhost", port);

59

60

// Send RPC message

61

ByteBuffer message = ByteBuffer.wrap("Hello, Spark!".getBytes());

62

client.sendRpc(message, new RpcResponseCallback() {

63

@Override

64

public void onSuccess(ByteBuffer response) {

65

System.out.println("Response: " + new String(response.array()));

66

}

67

68

@Override

69

public void onFailure(Throwable e) {

70

System.err.println("RPC failed: " + e.getMessage());

71

}

72

});

73

74

// Cleanup

75

client.close();

76

clientFactory.close();

77

server.close();

78

context.close();

79

```

80

81

## Architecture

82

83

Apache Spark Network Common is built around several key components:

84

85

- **Transport Layer**: Core networking functionality with `TransportContext` as the main entry point for creating clients and servers

86

- **Client-Server Model**: `TransportClient` for client operations and `TransportServer` for server operations

87

- **Message Protocol**: Comprehensive message protocol system for different types of network communication

88

- **Buffer Management**: Zero-copy buffer operations through `ManagedBuffer` implementations

89

- **Security Layer**: Authentication (SASL) and encryption (AES-CTR/GCM) capabilities for secure communication

90

- **Configuration System**: Flexible configuration management through `ConfigProvider` pattern

91

- **Database Support**: Specialized shuffle database functionality using LevelDB and RocksDB backends

92

93

## Capabilities

94

95

### Transport Context

96

97

Main entry point for creating transport clients and servers, managing Netty pipeline setup and network configuration.

98

99

```java { .api }

100

public final class TransportContext implements Closeable {

101

public TransportContext(TransportConf conf, RpcHandler rpcHandler);

102

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);

103

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);

104

105

public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);

106

public TransportClientFactory createClientFactory();

107

public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);

108

public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);

109

public TransportServer createServer(List<TransportServerBootstrap> bootstraps);

110

public TransportServer createServer();

111

public void close();

112

}

113

```

114

115

[Transport Context](./transport-context.md)

116

117

### Client Operations

118

119

High-performance client functionality for fetching data chunks, sending RPCs, and streaming data with full thread safety.

120

121

```java { .api }

122

public class TransportClient implements Closeable {

123

public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);

124

public void stream(String streamId, StreamCallback callback);

125

public long sendRpc(ByteBuffer message, RpcResponseCallback callback);

126

public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);

127

public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);

128

public void send(ByteBuffer message);

129

public boolean isActive();

130

public void close();

131

}

132

```

133

134

[Client Operations](./client-operations.md)

135

136

### Server Operations

137

138

Server-side functionality for handling client connections, processing RPC requests, and managing data streams.

139

140

```java { .api }

141

public class TransportServer implements Closeable {

142

public int getPort();

143

public MetricSet getAllMetrics();

144

public Counter getRegisteredConnections();

145

public void close();

146

}

147

148

public abstract class RpcHandler {

149

public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

150

public abstract StreamManager getStreamManager();

151

public void channelActive(TransportClient client);

152

public void channelInactive(TransportClient client);

153

public void exceptionCaught(Throwable cause, TransportClient client);

154

}

155

```

156

157

[Server Operations](./server-operations.md)

158

159

### Buffer Management

160

161

Zero-copy buffer management system with different backing implementations for efficient memory usage and data transfer.

162

163

```java { .api }

164

public abstract class ManagedBuffer {

165

public abstract long size();

166

public abstract ByteBuffer nioByteBuffer() throws IOException;

167

public abstract InputStream createInputStream() throws IOException;

168

public abstract ManagedBuffer retain();

169

public abstract ManagedBuffer release();

170

public abstract Object convertToNetty() throws IOException;

171

}

172

```

173

174

[Buffer Management](./buffer-management.md)

175

176

### Message Protocol

177

178

Comprehensive message protocol system for different types of network communication including RPC, streaming, and chunk fetching.

179

180

```java { .api }

181

public interface Message extends Encodable {

182

Type type();

183

ManagedBuffer body();

184

boolean isBodyInFrame();

185

186

enum Type {

187

ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,

188

RpcRequest, RpcResponse, RpcFailure,

189

StreamRequest, StreamResponse, StreamFailure,

190

OneWayMessage, UploadStream,

191

MergedBlockMetaRequest, MergedBlockMetaSuccess,

192

User

193

}

194

}

195

```

196

197

[Message Protocol](./message-protocol.md)

198

199

### Security and Authentication

200

201

Comprehensive security layer including SASL authentication and AES encryption for secure network communication.

202

203

```java { .api }

204

public interface TransportCipher {

205

String getKeyId() throws GeneralSecurityException;

206

void addToChannel(Channel channel) throws IOException, GeneralSecurityException;

207

}

208

209

public interface SecretKeyHolder {

210

String getSaslUser(String appId);

211

String getSecretKey(String appId);

212

}

213

```

214

215

[Security and Authentication](./security-authentication.md)

216

217

### Shuffle Database

218

219

Specialized database functionality for handling shuffle data storage using LevelDB and RocksDB backends.

220

221

```java { .api }

222

public interface DB extends Closeable {

223

void put(byte[] key, byte[] value) throws IOException;

224

byte[] get(byte[] key) throws IOException;

225

void delete(byte[] key) throws IOException;

226

DBIterator iterator();

227

}

228

229

public enum DBBackend {

230

LEVELDB, ROCKSDB;

231

232

public String fileName(String prefix);

233

public static DBBackend byName(String value);

234

}

235

```

236

237

[Shuffle Database](./shuffle-database.md)

238

239

### Configuration Management

240

241

Flexible configuration system with provider pattern for managing transport layer settings and network parameters.

242

243

```java { .api }

244

public class TransportConf {

245

public TransportConf(String module, ConfigProvider conf);

246

247

public String ioMode();

248

public boolean preferDirectBufs();

249

public int connectionTimeoutMs();

250

public int numConnectionsPerPeer();

251

public int serverThreads();

252

public int clientThreads();

253

public boolean encryptionEnabled();

254

public boolean saslEncryption();

255

}

256

257

public abstract class ConfigProvider {

258

public abstract String get(String name);

259

public String get(String name, String defaultValue);

260

public boolean getBoolean(String name, boolean defaultValue);

261

public int getInt(String name, int defaultValue);

262

public long getLong(String name, long defaultValue);

263

}

264

```

265

266

[Configuration Management](./configuration-management.md)

267

268

## Types

269

270

### Core Interfaces

271

272

```java { .api }

273

public interface Closeable {

274

void close() throws IOException;

275

}

276

277

public interface Encodable {

278

int encodedLength();

279

void encode(ByteBuf buf);

280

}

281

```

282

283

### Callback Interfaces

284

285

```java { .api }

286

public interface BaseResponseCallback {

287

void onFailure(Throwable e);

288

}

289

290

public interface RpcResponseCallback extends BaseResponseCallback {

291

void onSuccess(ByteBuffer response);

292

}

293

294

public interface ChunkReceivedCallback {

295

void onSuccess(int chunkIndex, ManagedBuffer buffer);

296

void onFailure(int chunkIndex, Throwable e);

297

}

298

299

public interface StreamCallback {

300

void onData(String streamId, ByteBuffer buf) throws IOException;

301

void onComplete(String streamId) throws IOException;

302

void onFailure(String streamId, Throwable cause) throws IOException;

303

}

304

305

public interface StreamCallbackWithID extends StreamCallback {

306

String getID();

307

}

308

```

309

310

### Bootstrap Interfaces

311

312

```java { .api }

313

public interface TransportClientBootstrap {

314

void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

315

}

316

317

public interface TransportServerBootstrap {

318

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

319

}

320

```

321

322

### Enumeration Types

323

324

```java { .api }

325

public enum IOMode {

326

NIO, EPOLL

327

}

328

```

329

330

### Exception Classes

331

332

```java { .api }

333

public class ChunkFetchFailureException extends RuntimeException {

334

public ChunkFetchFailureException(String errorMsg, Throwable cause);

335

public ChunkFetchFailureException(String errorMsg);

336

}

337

338

public class SaslTimeoutException extends RuntimeException {

339

// Standard RuntimeException constructors

340

}

341

342

public class BlockPushNonFatalFailure extends RuntimeException {

343

// Standard RuntimeException constructors

344

}

345

```