0
# Apache Spark Network Common
1
2
Apache Spark Network Common provides the core networking infrastructure for Apache Spark, implementing a high-performance transport layer built on Netty for efficient inter-node communication in distributed Spark clusters. It includes comprehensive networking components such as transport contexts for managing network connections, buffer management for zero-copy operations, client-server communication protocols, cryptographic support with forward-secure authentication protocols, SASL-based authentication mechanisms, and specialized shuffle database functionality.
3
4
## Package Information
5
6
- **Package Name**: org.apache.spark:spark-network-common_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-network-common_2.12</artifactId>
14
<version>3.5.6</version>
15
</dependency>
16
```
17
- **Gradle**: `implementation 'org.apache.spark:spark-network-common_2.12:3.5.6'`
18
19
## Core Imports
20
21
```java
22
import org.apache.spark.network.TransportContext;
23
import org.apache.spark.network.client.TransportClient;
24
import org.apache.spark.network.client.TransportClientFactory;
25
import org.apache.spark.network.server.TransportServer;
26
import org.apache.spark.network.server.RpcHandler;
27
import org.apache.spark.network.util.TransportConf;
28
import org.apache.spark.network.util.ConfigProvider;
29
import org.apache.spark.network.util.MapConfigProvider;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.spark.network.TransportContext;
36
import org.apache.spark.network.client.TransportClient;
37
import org.apache.spark.network.client.TransportClientFactory;
38
import org.apache.spark.network.server.TransportServer;
39
import org.apache.spark.network.server.NoOpRpcHandler;
40
import org.apache.spark.network.util.TransportConf;
41
import org.apache.spark.network.util.MapConfigProvider;
42
43
// Configure transport layer
44
Map<String, String> config = new HashMap<>();
45
config.put("spark.network.timeout", "120s");
46
ConfigProvider provider = new MapConfigProvider(config);
47
TransportConf conf = new TransportConf("test", provider);
48
49
// Create transport context (main entry point)
50
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
51
52
// Create and start server
53
TransportServer server = context.createServer();
54
int port = server.getPort();
55
56
// Create client factory and connect to server
57
TransportClientFactory clientFactory = context.createClientFactory();
58
TransportClient client = clientFactory.createClient("localhost", port);
59
60
// Send RPC message
61
ByteBuffer message = ByteBuffer.wrap("Hello, Spark!".getBytes());
62
client.sendRpc(message, new RpcResponseCallback() {
63
@Override
64
public void onSuccess(ByteBuffer response) {
65
System.out.println("Response: " + new String(response.array()));
66
}
67
68
@Override
69
public void onFailure(Throwable e) {
70
System.err.println("RPC failed: " + e.getMessage());
71
}
72
});
73
74
// Cleanup
75
client.close();
76
clientFactory.close();
77
server.close();
78
context.close();
79
```
80
81
## Architecture
82
83
Apache Spark Network Common is built around several key components:
84
85
- **Transport Layer**: Core networking functionality with `TransportContext` as the main entry point for creating clients and servers
86
- **Client-Server Model**: `TransportClient` for client operations and `TransportServer` for server operations
87
- **Message Protocol**: Comprehensive message protocol system for different types of network communication
88
- **Buffer Management**: Zero-copy buffer operations through `ManagedBuffer` implementations
89
- **Security Layer**: Authentication (SASL) and encryption (AES-CTR/GCM) capabilities for secure communication
90
- **Configuration System**: Flexible configuration management through `ConfigProvider` pattern
91
- **Database Support**: Specialized shuffle database functionality using LevelDB and RocksDB backends
92
93
## Capabilities
94
95
### Transport Context
96
97
Main entry point for creating transport clients and servers, managing Netty pipeline setup and network configuration.
98
99
```java { .api }
100
public final class TransportContext implements Closeable {
101
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
102
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
103
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);
104
105
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
106
public TransportClientFactory createClientFactory();
107
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
108
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
109
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
110
public TransportServer createServer();
111
public void close();
112
}
113
```
114
115
[Transport Context](./transport-context.md)
116
117
### Client Operations
118
119
High-performance client functionality for fetching data chunks, sending RPCs, and streaming data with full thread safety.
120
121
```java { .api }
122
public class TransportClient implements Closeable {
123
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
124
public void stream(String streamId, StreamCallback callback);
125
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
126
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
127
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
128
public void send(ByteBuffer message);
129
public boolean isActive();
130
public void close();
131
}
132
```
133
134
[Client Operations](./client-operations.md)
135
136
### Server Operations
137
138
Server-side functionality for handling client connections, processing RPC requests, and managing data streams.
139
140
```java { .api }
141
public class TransportServer implements Closeable {
142
public int getPort();
143
public MetricSet getAllMetrics();
144
public Counter getRegisteredConnections();
145
public void close();
146
}
147
148
public abstract class RpcHandler {
149
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
150
public abstract StreamManager getStreamManager();
151
public void channelActive(TransportClient client);
152
public void channelInactive(TransportClient client);
153
public void exceptionCaught(Throwable cause, TransportClient client);
154
}
155
```
156
157
[Server Operations](./server-operations.md)
158
159
### Buffer Management
160
161
Zero-copy buffer management system with different backing implementations for efficient memory usage and data transfer.
162
163
```java { .api }
164
public abstract class ManagedBuffer {
165
public abstract long size();
166
public abstract ByteBuffer nioByteBuffer() throws IOException;
167
public abstract InputStream createInputStream() throws IOException;
168
public abstract ManagedBuffer retain();
169
public abstract ManagedBuffer release();
170
public abstract Object convertToNetty() throws IOException;
171
}
172
```
173
174
[Buffer Management](./buffer-management.md)
175
176
### Message Protocol
177
178
Comprehensive message protocol system for different types of network communication including RPC, streaming, and chunk fetching.
179
180
```java { .api }
181
public interface Message extends Encodable {
182
Type type();
183
ManagedBuffer body();
184
boolean isBodyInFrame();
185
186
enum Type {
187
ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,
188
RpcRequest, RpcResponse, RpcFailure,
189
StreamRequest, StreamResponse, StreamFailure,
190
OneWayMessage, UploadStream,
191
MergedBlockMetaRequest, MergedBlockMetaSuccess,
192
User
193
}
194
}
195
```
196
197
[Message Protocol](./message-protocol.md)
198
199
### Security and Authentication
200
201
Comprehensive security layer including SASL authentication and AES encryption for secure network communication.
202
203
```java { .api }
204
public interface TransportCipher {
205
String getKeyId() throws GeneralSecurityException;
206
void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
207
}
208
209
public interface SecretKeyHolder {
210
String getSaslUser(String appId);
211
String getSecretKey(String appId);
212
}
213
```
214
215
[Security and Authentication](./security-authentication.md)
216
217
### Shuffle Database
218
219
Specialized database functionality for handling shuffle data storage using LevelDB and RocksDB backends.
220
221
```java { .api }
222
public interface DB extends Closeable {
223
void put(byte[] key, byte[] value) throws IOException;
224
byte[] get(byte[] key) throws IOException;
225
void delete(byte[] key) throws IOException;
226
DBIterator iterator();
227
}
228
229
public enum DBBackend {
230
LEVELDB, ROCKSDB;
231
232
public String fileName(String prefix);
233
public static DBBackend byName(String value);
234
}
235
```
236
237
[Shuffle Database](./shuffle-database.md)
238
239
### Configuration Management
240
241
Flexible configuration system with provider pattern for managing transport layer settings and network parameters.
242
243
```java { .api }
244
public class TransportConf {
245
public TransportConf(String module, ConfigProvider conf);
246
247
public String ioMode();
248
public boolean preferDirectBufs();
249
public int connectionTimeoutMs();
250
public int numConnectionsPerPeer();
251
public int serverThreads();
252
public int clientThreads();
253
public boolean encryptionEnabled();
254
public boolean saslEncryption();
255
}
256
257
public abstract class ConfigProvider {
258
public abstract String get(String name);
259
public String get(String name, String defaultValue);
260
public boolean getBoolean(String name, boolean defaultValue);
261
public int getInt(String name, int defaultValue);
262
public long getLong(String name, long defaultValue);
263
}
264
```
265
266
[Configuration Management](./configuration-management.md)
267
268
## Types
269
270
### Core Interfaces
271
272
```java { .api }
273
public interface Closeable {
274
void close() throws IOException;
275
}
276
277
public interface Encodable {
278
int encodedLength();
279
void encode(ByteBuf buf);
280
}
281
```
282
283
### Callback Interfaces
284
285
```java { .api }
286
public interface BaseResponseCallback {
287
void onFailure(Throwable e);
288
}
289
290
public interface RpcResponseCallback extends BaseResponseCallback {
291
void onSuccess(ByteBuffer response);
292
}
293
294
public interface ChunkReceivedCallback {
295
void onSuccess(int chunkIndex, ManagedBuffer buffer);
296
void onFailure(int chunkIndex, Throwable e);
297
}
298
299
public interface StreamCallback {
300
void onData(String streamId, ByteBuffer buf) throws IOException;
301
void onComplete(String streamId) throws IOException;
302
void onFailure(String streamId, Throwable cause) throws IOException;
303
}
304
305
public interface StreamCallbackWithID extends StreamCallback {
306
String getID();
307
}
308
```
309
310
### Bootstrap Interfaces
311
312
```java { .api }
313
public interface TransportClientBootstrap {
314
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
315
}
316
317
public interface TransportServerBootstrap {
318
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
319
}
320
```
321
322
### Enumeration Types
323
324
```java { .api }
325
public enum IOMode {
326
NIO, EPOLL
327
}
328
```
329
330
### Exception Classes
331
332
```java { .api }
333
public class ChunkFetchFailureException extends RuntimeException {
334
public ChunkFetchFailureException(String errorMsg, Throwable cause);
335
public ChunkFetchFailureException(String errorMsg);
336
}
337
338
public class SaslTimeoutException extends RuntimeException {
339
// Standard RuntimeException constructors
340
}
341
342
public class BlockPushNonFatalFailure extends RuntimeException {
343
// Standard RuntimeException constructors
344
}
345
```