0
# Apache Spark Network-Common
1
2
Apache Spark Network-Common provides the foundational networking layer for Apache Spark's distributed computing engine. It implements a high-performance transport abstraction built on Netty, offering comprehensive client-server communication capabilities including RPC handling, data streaming, authentication (including SASL and custom auth protocols), encryption, and connection management.
3
4
## Package Information
5
6
- **Package Name**: spark-network-common_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-network-common_2.11
11
- **Installation**: `<dependency><groupId>org.apache.spark</groupId><artifactId>spark-network-common_2.11</artifactId><version>2.4.8</version></dependency>`
12
13
## Core Imports
14
15
```java
16
import org.apache.spark.network.TransportContext;
17
import org.apache.spark.network.client.TransportClient;
18
import org.apache.spark.network.client.TransportClientFactory;
19
import org.apache.spark.network.server.TransportServer;
20
import org.apache.spark.network.server.RpcHandler;
21
import org.apache.spark.network.util.TransportConf;
22
```
23
24
## Basic Usage
25
26
```java
27
import org.apache.spark.network.TransportContext;
28
import org.apache.spark.network.client.TransportClient;
29
import org.apache.spark.network.client.TransportClientFactory;
30
import org.apache.spark.network.server.TransportServer;
31
import org.apache.spark.network.server.RpcHandler;
32
import org.apache.spark.network.util.TransportConf;
33
import org.apache.spark.network.util.MapConfigProvider;
34
35
// Configure transport
36
TransportConf conf = new TransportConf("myapp", new MapConfigProvider(configMap));
37
38
// Create RPC handler
39
RpcHandler rpcHandler = new MyRpcHandler();
40
41
// Set up transport context
42
TransportContext context = new TransportContext(conf, rpcHandler);
43
44
// Create server
45
TransportServer server = context.createServer(0); // 0 = any available port
46
int port = server.getPort();
47
48
// Create client factory and client
49
TransportClientFactory clientFactory = context.createClientFactory();
50
TransportClient client = clientFactory.createClient("localhost", port);
51
52
// Send RPC
53
ByteBuffer message = ByteBuffer.wrap("Hello".getBytes());
54
client.sendRpc(message, new RpcResponseCallback() {
55
@Override
56
public void onSuccess(ByteBuffer response) {
57
System.out.println("Response received: " + new String(response.array()));
58
}
59
60
@Override
61
public void onFailure(Throwable e) {
62
System.err.println("RPC failed: " + e.getMessage());
63
}
64
});
65
66
// Cleanup
67
client.close();
68
server.close();
69
clientFactory.close();
70
```
71
72
## Architecture
73
74
Apache Spark Network-Common is built around several key components:
75
76
- **Transport Context**: Central factory (`TransportContext`) for creating servers and client factories with consistent configuration
77
- **Client-Server Model**: Asynchronous client (`TransportClient`) and server (`TransportServer`) implementations with connection pooling
78
- **RPC Framework**: Pluggable RPC handlers with support for bidirectional communication and one-way messages
79
- **Stream Management**: Efficient streaming data transfer with chunk-based fetching and zero-copy I/O
80
- **Authentication Layer**: Pluggable authentication via SASL and custom protocols with encryption support
81
- **Buffer Abstraction**: Unified buffer interface (`ManagedBuffer`) with multiple backing implementations (file, memory, Netty)
82
- **Protocol Stack**: Complete message protocol with encoding/decoding and frame-based transport
83
84
## Capabilities
85
86
### Transport Layer
87
88
Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and resource management.
89
90
```java { .api }
91
public class TransportContext {
92
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
93
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
94
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
95
public TransportClientFactory createClientFactory();
96
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
97
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
98
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
99
public TransportServer createServer();
100
}
101
102
public class TransportClient {
103
public Channel getChannel();
104
public boolean isActive();
105
public SocketAddress getSocketAddress();
106
public String getClientId();
107
public void setClientId(String id);
108
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
109
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
110
public void send(ByteBuffer message);
111
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
112
public void stream(String streamId, StreamCallback callback);
113
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
114
public void removeRpcRequest(long requestId);
115
public void timeOut();
116
public void close();
117
}
118
119
public class TransportServer {
120
public int getPort();
121
public void close();
122
}
123
```
124
125
[Transport Layer](./transport.md)
126
127
### Buffer Management
128
129
Unified buffer abstraction with multiple backing implementations for efficient memory and file-based data handling.
130
131
```java { .api }
132
public abstract class ManagedBuffer {
133
public abstract long size();
134
public abstract ByteBuffer nioByteBuffer() throws IOException;
135
public abstract InputStream createInputStream() throws IOException;
136
public abstract ManagedBuffer retain();
137
public abstract ManagedBuffer release();
138
public abstract Object convertToNetty() throws IOException;
139
}
140
141
public final class FileSegmentManagedBuffer extends ManagedBuffer {
142
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
143
public File getFile();
144
public long getOffset();
145
public long getLength();
146
}
147
```
148
149
[Buffer Management](./buffers.md)
150
151
### Protocol Handling
152
153
Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication.
154
155
```java { .api }
156
public interface Message extends Encodable {
157
}
158
159
public interface Encodable {
160
int encodedLength();
161
void encode(ByteBuf buf);
162
}
163
164
public final class RpcRequest extends AbstractMessage implements RequestMessage {
165
public RpcRequest(long requestId, ManagedBuffer message);
166
}
167
168
public final class RpcResponse extends AbstractResponseMessage implements ResponseMessage {
169
public RpcResponse(long requestId, ManagedBuffer message);
170
}
171
```
172
173
[Protocol Handling](./protocol.md)
174
175
### Authentication
176
177
Pluggable authentication system supporting SASL and custom authentication protocols with encryption.
178
179
```java { .api }
180
public class SaslClientBootstrap implements TransportClientBootstrap {
181
public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
182
}
183
184
public class SaslServerBootstrap implements TransportServerBootstrap {
185
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
186
}
187
188
public interface SecretKeyHolder {
189
String getSaslUser(String appId);
190
String getSecretKey(String appId);
191
}
192
```
193
194
[Authentication](./authentication.md)
195
196
### Stream Management
197
198
Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead.
199
200
```java { .api }
201
public abstract class StreamManager {
202
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
203
public abstract ManagedBuffer openStream(String streamId);
204
public void connectionTerminated(Channel channel);
205
public void checkAuthorization(TransportClient client, long streamId);
206
}
207
208
public class OneForOneStreamManager extends StreamManager {
209
public OneForOneStreamManager();
210
public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);
211
}
212
```
213
214
[Stream Management](./streaming.md)
215
216
### Configuration
217
218
Comprehensive configuration system with performance tuning options for connection management, I/O settings, and security parameters.
219
220
```java { .api }
221
public class TransportConf {
222
public TransportConf(String module, ConfigProvider conf);
223
public int connectionTimeoutMs();
224
public int numConnectionsPerPeer();
225
public int serverThreads();
226
public int clientThreads();
227
public int receiveBuf();
228
public int sendBuf();
229
public boolean encryptionEnabled();
230
public String cipherTransformation();
231
}
232
```
233
234
[Configuration and Utilities](./configuration.md)
235
236
## Types
237
238
```java { .api }
239
// Core callback interfaces
240
public interface RpcResponseCallback {
241
void onSuccess(ByteBuffer response);
242
void onFailure(Throwable e);
243
}
244
245
public interface ChunkReceivedCallback {
246
void onSuccess(int chunkIndex, ManagedBuffer buffer);
247
void onFailure(int chunkIndex, Throwable e);
248
}
249
250
public interface StreamCallback {
251
void onData(String streamId, ByteBuffer buf) throws IOException;
252
void onComplete(String streamId) throws IOException;
253
void onFailure(String streamId, Throwable cause) throws IOException;
254
}
255
256
// Bootstrap interfaces
257
public interface TransportClientBootstrap {
258
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
259
}
260
261
public interface TransportServerBootstrap {
262
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
263
}
264
265
// Configuration provider
266
public abstract class ConfigProvider {
267
public abstract String get(String name);
268
}
269
270
public class MapConfigProvider extends ConfigProvider {
271
public MapConfigProvider(Map<String, String> props);
272
}
273
```