0
# Apache Spark Network Common
1
2
Apache Spark Network Common provides a comprehensive, high-performance networking framework for distributed computing systems. It implements a Netty-based networking layer that handles client-server communication, buffer management, protocol definitions, authentication, and encryption across Spark clusters.
3
4
## Package Information
5
6
- **Package Name**: spark-network-common_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.spark
10
- **Version**: 3.0.1
11
- **Installation**: Add to Maven dependencies:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-network-common_2.12</artifactId>
17
<version>3.0.1</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.spark.network.TransportContext;
25
import org.apache.spark.network.client.TransportClient;
26
import org.apache.spark.network.client.TransportClientFactory;
27
import org.apache.spark.network.server.TransportServer;
28
import org.apache.spark.network.server.RpcHandler;
29
import org.apache.spark.network.util.TransportConf;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.spark.network.TransportContext;
36
import org.apache.spark.network.client.TransportClient;
37
import org.apache.spark.network.client.TransportClientFactory;
38
import org.apache.spark.network.server.TransportServer;
39
import org.apache.spark.network.server.NoOpRpcHandler;
40
import org.apache.spark.network.util.MapConfigProvider;
41
import org.apache.spark.network.util.TransportConf;
42
43
// Create transport configuration
44
TransportConf conf = new TransportConf("example", new MapConfigProvider(Collections.emptyMap()));
45
46
// Create transport context with RPC handler
47
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
48
49
// Create and start server
50
TransportServer server = context.createServer();
51
int port = server.getPort();
52
53
// Create client factory and connect to server
54
TransportClientFactory clientFactory = context.createClientFactory();
55
TransportClient client = clientFactory.createClient("localhost", port);
56
57
// Send RPC request
58
ByteBuffer request = ByteBuffer.wrap("Hello, Spark!".getBytes());
59
client.sendRpc(request, new RpcResponseCallback() {
60
@Override
61
public void onSuccess(ByteBuffer response) {
62
System.out.println("Received response: " + new String(response.array()));
63
}
64
65
@Override
66
public void onFailure(Throwable e) {
67
System.err.println("RPC failed: " + e.getMessage());
68
}
69
});
70
71
// Cleanup
72
client.close();
73
clientFactory.close();
74
server.close();
75
context.close();
76
```
77
78
## Architecture
79
80
Apache Spark Network Common is built around several key architectural components:
81
82
- **Transport Layer**: Core networking infrastructure with `TransportContext`, `TransportClient`, and `TransportServer` providing high-level abstractions for client-server communication
83
- **Buffer Management**: `ManagedBuffer` hierarchy offering efficient memory management for different data sources (files, NIO buffers, Netty buffers)
84
- **Protocol Layer**: Standardized message formats with encoding/decoding support through the `Message` interface and concrete implementations
85
- **Security Framework**: Comprehensive authentication (SASL, Spark custom auth) and encryption (AES) capabilities for secure data transmission
86
- **Server Framework**: Pluggable RPC handlers and stream managers enabling custom business logic integration
87
- **Utilities**: Configuration management, I/O utilities, and Netty integration helpers
88
89
## Capabilities
90
91
### Transport Context
92
93
Core networking context that manages the lifecycle of clients and servers, handles configuration, and sets up Netty channel pipelines for communication.
94
95
```java { .api }
96
public class TransportContext implements Closeable {
97
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
98
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
99
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);
100
public TransportClientFactory createClientFactory();
101
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
102
public TransportServer createServer();
103
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
104
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
105
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
106
public TransportChannelHandler initializePipeline(SocketChannel channel);
107
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);
108
public TransportConf getConf();
109
public Counter getRegisteredConnections();
110
public void close();
111
}
112
```
113
114
[Transport Layer](./transport-layer.md)
115
116
### Buffer Management
117
118
Efficient buffer abstractions for handling data in various forms - file segments, NIO ByteBuffers, and Netty ByteBufs - with unified interface and reference counting.
119
120
```java { .api }
121
public abstract class ManagedBuffer {
122
public abstract long size();
123
public abstract ByteBuffer nioByteBuffer() throws IOException;
124
public abstract InputStream createInputStream() throws IOException;
125
public abstract ManagedBuffer retain();
126
public abstract ManagedBuffer release();
127
public abstract Object convertToNetty() throws IOException;
128
}
129
130
public final class FileSegmentManagedBuffer extends ManagedBuffer {
131
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
132
public File getFile();
133
public long getOffset();
134
public long getLength();
135
}
136
```
137
138
[Buffer Management](./buffer-management.md)
139
140
### Client Communications
141
142
High-level client interface for establishing connections, sending RPC requests, fetching data chunks, and streaming data with callback-based asynchronous operations.
143
144
```java { .api }
145
public class TransportClient implements Closeable {
146
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
147
public void stream(String streamId, StreamCallback callback);
148
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
149
public void send(ByteBuffer message);
150
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
151
public void removeRpcRequest(long requestId);
152
public boolean isActive();
153
public void close();
154
}
155
156
public interface RpcResponseCallback {
157
void onSuccess(ByteBuffer response);
158
void onFailure(Throwable e);
159
}
160
```
161
162
[Client Communications](./client-communications.md)
163
164
### Protocol Messages
165
166
Standardized on-the-wire message formats for RPC requests/responses, chunk fetching, streaming, and error handling with efficient encoding/decoding.
167
168
```java { .api }
169
public interface Message extends Encodable {
170
Type type();
171
ManagedBuffer body();
172
boolean isBodyInFrame();
173
174
enum Type {
175
ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,
176
RpcRequest, RpcResponse, RpcFailure,
177
StreamRequest, StreamResponse, StreamFailure,
178
OneWayMessage, UploadStream, User
179
}
180
}
181
182
public class RpcRequest implements RequestMessage {
183
public RpcRequest(long requestId, ManagedBuffer message);
184
public long requestId();
185
}
186
187
public class RpcResponse implements ResponseMessage {
188
public RpcResponse(long requestId, ManagedBuffer message);
189
public long requestId();
190
}
191
```
192
193
[Protocol Messages](./protocol-messages.md)
194
195
### Authentication & Security
196
197
Comprehensive security framework supporting both SASL authentication and Spark's custom authentication protocol, with AES encryption for secure data transmission.
198
199
```java { .api }
200
public interface SecretKeyHolder {
201
String getSaslUser(String appId);
202
String getSecretKey(String appId);
203
}
204
205
public class SaslClientBootstrap implements TransportClientBootstrap {
206
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
207
public void doBootstrap(TransportClient client, Channel channel);
208
}
209
210
public class AuthClientBootstrap implements TransportClientBootstrap {
211
public AuthClientBootstrap(TransportConf conf, String appId, String appKey);
212
public void doBootstrap(TransportClient client, Channel channel);
213
}
214
```
215
216
[Authentication & Security](./authentication-security.md)
217
218
### Server Framework
219
220
Server-side infrastructure with pluggable RPC handlers, stream managers, and bootstrap mechanisms for customizing server behavior and processing client requests.
221
222
```java { .api }
223
public abstract class RpcHandler {
224
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
225
public abstract StreamManager getStreamManager();
226
public void channelActive(TransportClient client);
227
public void channelInactive(TransportClient client);
228
}
229
230
public abstract class StreamManager {
231
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
232
public abstract ManagedBuffer openStream(String streamId);
233
public void connectionTerminated(Channel channel);
234
}
235
236
public class TransportServer implements Closeable {
237
public int getPort();
238
public void close();
239
}
240
```
241
242
[Server Framework](./server-framework.md)
243
244
### Configuration & Utilities
245
246
Configuration management, utility functions, and Netty integration helpers providing foundation services for the networking layer.
247
248
```java { .api }
249
public class TransportConf {
250
public TransportConf(String module, ConfigProvider conf);
251
public String getModuleName();
252
public int connectionTimeoutMs();
253
public int numConnectionsPerPeer();
254
public int serverThreads();
255
public int clientThreads();
256
public boolean preferDirectBufs();
257
}
258
259
public enum ByteUnit {
260
BYTE, KiB, MiB, GiB, TiB, PiB;
261
262
public long convertFrom(long d, ByteUnit u);
263
public long toBytes(long d);
264
}
265
```
266
267
[Configuration & Utilities](./configuration-utilities.md)
268
269
## Core Types
270
271
```java { .api }
272
public interface Encodable {
273
int encodedLength();
274
void encode(ByteBuf buf);
275
}
276
277
public interface TransportClientBootstrap {
278
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
279
}
280
281
public interface TransportServerBootstrap {
282
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
283
}
284
285
public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
286
// Netty channel handler for transport layer
287
}
288
289
// From io.netty.channel.socket
290
public interface SocketChannel extends Channel {
291
// Netty socket channel interface
292
}
293
294
// From com.codahale.metrics
295
public class Counter {
296
public long getCount();
297
}
298
```