Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-common-2-10@1.6.00
# Apache Spark Network Common
1
2
Apache Spark Network Common provides the core networking infrastructure for Apache Spark cluster computing. It implements a high-performance, Netty-based transport layer that enables efficient communication between Spark components across cluster nodes with RPC, chunk fetching, SASL authentication, and comprehensive buffer management.
3
4
## Package Information
5
6
- **Package Name**: spark-network-common_2.10
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.spark
10
- **Version**: 1.6.3
11
- **Installation**: Include as Maven dependency
12
13
## Core Imports
14
15
```java
16
import org.apache.spark.network.TransportContext;
17
import org.apache.spark.network.client.TransportClient;
18
import org.apache.spark.network.client.TransportClientFactory;
19
import org.apache.spark.network.server.TransportServer;
20
import org.apache.spark.network.server.RpcHandler;
21
import org.apache.spark.network.buffer.ManagedBuffer;
22
import org.apache.spark.network.util.TransportConf;
23
```
24
25
## Basic Usage
26
27
```java
28
import org.apache.spark.network.TransportContext;
29
import org.apache.spark.network.server.RpcHandler;
30
import org.apache.spark.network.server.NoOpRpcHandler;
31
import org.apache.spark.network.util.TransportConf;
32
import org.apache.spark.network.util.SystemPropertyConfigProvider;
33
34
// Create transport configuration
35
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
36
37
// Create RPC handler
38
RpcHandler rpcHandler = new NoOpRpcHandler();
39
40
// Create transport context
41
TransportContext context = new TransportContext(conf, rpcHandler);
42
43
// Create client factory for outbound connections
44
TransportClientFactory clientFactory = context.createClientFactory();
45
46
// Create client to connect to remote server
47
TransportClient client = clientFactory.createClient("localhost", 8080);
48
49
// Create server for inbound connections
50
TransportServer server = context.createServer(8080, new ArrayList<>());
51
```
52
53
## Architecture
54
55
Apache Spark Network Common is built around several key architectural components:
56
57
- **Transport Layer**: Core networking abstraction with `TransportContext` serving as the main factory for clients and servers
58
- **Client/Server Model**: Asymmetric design with `TransportClient` for outbound connections and `TransportServer` for inbound connections
59
- **Message Protocol**: Type-safe message system with specific message types for RPC, chunk fetching, and streaming
60
- **Buffer Management**: Unified buffer abstraction (`ManagedBuffer`) supporting memory, file, and Netty ByteBuf backends
61
- **Security Framework**: SASL-based authentication and encryption with pluggable secret key providers
62
- **Configuration System**: Centralized configuration with multiple provider implementations
63
64
## Capabilities
65
66
### Transport Context and Setup
67
68
Core factory and setup functionality for creating transport clients and servers. The main entry point for all networking operations in Spark.
69
70
```java { .api }
71
public class TransportContext {
72
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
73
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
74
75
public TransportClientFactory createClientFactory();
76
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
77
78
public TransportServer createServer();
79
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
80
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
81
}
82
```
83
84
[Transport Setup](./transport-setup.md)
85
86
### Client Operations
87
88
Client-side networking functionality for establishing connections, sending RPC requests, and fetching data chunks from remote servers.
89
90
```java { .api }
91
public class TransportClient implements Closeable {
92
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
93
public void stream(String streamId, StreamCallback callback);
94
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
95
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
96
public void send(ByteBuffer message);
97
}
98
99
public class TransportClientFactory implements Closeable {
100
public TransportClient createClient(String remoteHost, int remotePort);
101
public TransportClient createUnmanagedClient(String remoteHost, int remotePort);
102
}
103
```
104
105
[Client Operations](./client-operations.md)
106
107
### Server Operations
108
109
Server-side networking functionality for handling inbound connections, processing RPC requests, and managing data streams.
110
111
```java { .api }
112
public class TransportServer implements Closeable {
113
public int getPort();
114
public void close();
115
}
116
117
public abstract class RpcHandler {
118
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
119
public abstract StreamManager getStreamManager();
120
}
121
122
public abstract class StreamManager {
123
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
124
}
125
```
126
127
[Server Operations](./server-operations.md)
128
129
### Message Protocol
130
131
Type-safe message protocol for network communication, including RPC requests/responses, chunk fetching, and streaming operations.
132
133
```java { .api }
134
public interface Message {
135
Type type();
136
ManagedBuffer body();
137
boolean isBodyInFrame();
138
139
enum Type {
140
ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,
141
RpcRequest, RpcResponse, RpcFailure,
142
StreamRequest, StreamResponse, StreamFailure,
143
OneWayMessage, User
144
}
145
}
146
147
public class RpcRequest extends AbstractMessage implements RequestMessage {
148
public final long requestId;
149
public RpcRequest(long requestId, ManagedBuffer message);
150
}
151
152
public class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
153
public final StreamChunkId streamChunkId;
154
public ChunkFetchRequest(StreamChunkId streamChunkId);
155
}
156
```
157
158
[Message Protocol](./message-protocol.md)
159
160
### Buffer Management
161
162
Unified buffer management system providing abstractions over different buffer types including memory, file segments, and Netty ByteBufs.
163
164
```java { .api }
165
public abstract class ManagedBuffer {
166
public abstract long size();
167
public abstract ByteBuffer nioByteBuffer() throws IOException;
168
public abstract InputStream createInputStream() throws IOException;
169
public abstract ManagedBuffer retain();
170
public abstract ManagedBuffer release();
171
public abstract Object convertToNetty() throws IOException;
172
}
173
174
public class NioManagedBuffer extends ManagedBuffer {
175
public NioManagedBuffer(ByteBuffer buf);
176
}
177
178
public class FileSegmentManagedBuffer extends ManagedBuffer {
179
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
180
}
181
```
182
183
[Buffer Management](./buffer-management.md)
184
185
### SASL Authentication
186
187
Security framework providing SASL-based authentication and encryption for secure network communication.
188
189
```java { .api }
190
public interface SecretKeyHolder {
191
String getSaslUser(String appId);
192
String getSecretKey(String appId);
193
}
194
195
public class SaslClientBootstrap implements TransportClientBootstrap {
196
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
197
}
198
199
public class SaslServerBootstrap implements TransportServerBootstrap {
200
public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
201
}
202
```
203
204
[SASL Authentication](./sasl-authentication.md)
205
206
### Configuration and Utilities
207
208
Configuration system and utility classes for transport settings, Netty integration, and Java operations.
209
210
```java { .api }
211
public class TransportConf {
212
public TransportConf(String module, ConfigProvider conf);
213
214
public String ioMode();
215
public boolean preferDirectBufs();
216
public int connectionTimeoutMs();
217
public int numConnectionsPerPeer();
218
public int serverThreads();
219
public int clientThreads();
220
}
221
222
public abstract class ConfigProvider {
223
public abstract String get(String name);
224
public String get(String name, String defaultValue);
225
public int getInt(String name, int defaultValue);
226
public boolean getBoolean(String name, boolean defaultValue);
227
}
228
```
229
230
[Configuration and Utilities](./configuration-utilities.md)
231
232
## Types
233
234
### Core Interfaces
235
236
```java { .api }
237
public interface RequestMessage extends Message {}
238
public interface ResponseMessage extends Message {}
239
240
public interface ChunkReceivedCallback {
241
void onSuccess(int chunkIndex, ManagedBuffer buffer);
242
void onFailure(int chunkIndex, Throwable e);
243
}
244
245
public interface RpcResponseCallback {
246
void onSuccess(ByteBuffer response);
247
void onFailure(Throwable e);
248
}
249
250
public interface StreamCallback {
251
void onData(String streamId, ByteBuffer buf);
252
void onComplete(String streamId);
253
void onFailure(String streamId, Throwable cause);
254
}
255
256
public interface TransportClientBootstrap {
257
void doBootstrap(TransportClient client, Channel channel);
258
}
259
260
public interface TransportServerBootstrap {
261
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
262
}
263
```
264
265
### Stream and Chunk Identifiers
266
267
```java { .api }
268
public class StreamChunkId {
269
public final long streamId;
270
public final int chunkIndex;
271
272
public StreamChunkId(long streamId, int chunkIndex);
273
public String toString();
274
public boolean equals(Object other);
275
public int hashCode();
276
}
277
```
278
279
### Configuration Enums
280
281
```java { .api }
282
public enum IOMode {
283
NIO, EPOLL
284
}
285
286
public enum ByteUnit {
287
BYTE, KiB, MiB, GiB, TiB, PiB;
288
289
public long convertFrom(long d, ByteUnit u);
290
public long convertTo(long d, ByteUnit u);
291
public double toBytes(long d);
292
public long toKiB(long d);
293
public long toMiB(long d);
294
public long toGiB(long d);
295
public long toTiB(long d);
296
public long toPiB(long d);
297
}
298
```
299
300
### Exceptions
301
302
```java { .api }
303
public class ChunkFetchFailureException extends RuntimeException {
304
public ChunkFetchFailureException(String errorMsg, Throwable cause);
305
public ChunkFetchFailureException(String errorMsg);
306
}
307
```