0
# Transport Layer
1
2
The transport layer provides the core networking infrastructure for Apache Spark's distributed communication. It centers around `TransportContext` which manages client and server instances, handles configuration, and sets up Netty channel pipelines.
3
4
## Capabilities
5
6
### TransportContext
7
8
Central context class that manages the lifecycle of transport clients and servers, handles configuration, and provides factory methods for creating network components.
9
10
```java { .api }
11
/**
12
* Contains context to create TransportServer, TransportClientFactory, and to setup
13
* Netty Channel pipelines. Handles both control-plane RPCs and data-plane chunk fetching.
14
*/
15
public class TransportContext implements Closeable {
16
/**
17
* Create a TransportContext with default settings
18
* @param conf Transport configuration
19
* @param rpcHandler Handler for RPC messages
20
*/
21
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
22
23
/**
24
* Create a TransportContext with idle connection control
25
* @param conf Transport configuration
26
* @param rpcHandler Handler for RPC messages
27
* @param closeIdleConnections Whether to close idle connections
28
*/
29
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
30
31
/**
32
* Create a TransportContext with full configuration options
33
* @param conf Transport configuration
34
* @param rpcHandler Handler for RPC messages
35
* @param closeIdleConnections Whether to close idle connections
36
* @param isClientOnly Whether this context is client-only
37
*/
38
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);
39
40
/**
41
* Create a client factory for establishing connections
42
* @return TransportClientFactory instance
43
*/
44
public TransportClientFactory createClientFactory();
45
46
/**
47
* Create a client factory with bootstrap configurations
48
* @param bootstraps List of client bootstrap configurations
49
* @return TransportClientFactory instance
50
*/
51
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
52
53
/**
54
* Create a server on a random available port
55
* @return TransportServer instance
56
*/
57
public TransportServer createServer();
58
59
/**
60
* Create a server with bootstrap configurations
61
* @param bootstraps List of server bootstrap configurations
62
* @return TransportServer instance
63
*/
64
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
65
66
/**
67
* Create a server on a specific port
68
* @param port Port number to bind to
69
* @param bootstraps List of server bootstrap configurations
70
* @return TransportServer instance
71
*/
72
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
73
74
/**
75
* Create a server on a specific host and port
76
* @param host Host to bind to
77
* @param port Port number to bind to
78
* @param bootstraps List of server bootstrap configurations
79
* @return TransportServer instance
80
*/
81
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
82
83
/**
84
* Initialize Netty channel pipeline for a socket channel
85
* @param channel Socket channel to initialize
86
* @return TransportChannelHandler for the pipeline
87
*/
88
public TransportChannelHandler initializePipeline(SocketChannel channel);
89
90
/**
91
* Initialize Netty channel pipeline with custom RPC handler
92
* @param channel Socket channel to initialize
93
* @param channelRpcHandler Custom RPC handler for this channel
94
* @return TransportChannelHandler for the pipeline
95
*/
96
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);
97
98
/**
99
* Get the transport configuration
100
* @return TransportConf instance
101
*/
102
public TransportConf getConf();
103
104
/**
105
* Get counter for registered connections
106
* @return Counter for monitoring connections
107
*/
108
public Counter getRegisteredConnections();
109
110
/**
111
* Close the context and clean up resources
112
*/
113
public void close();
114
}
115
```
116
117
**Usage Examples:**
118
119
```java
120
import org.apache.spark.network.TransportContext;
121
import org.apache.spark.network.server.NoOpRpcHandler;
122
import org.apache.spark.network.util.MapConfigProvider;
123
import org.apache.spark.network.util.TransportConf;
124
125
// Basic context setup
126
TransportConf conf = new TransportConf("myapp", new MapConfigProvider(Collections.emptyMap()));
127
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
128
129
// Create server and client factory
130
TransportServer server = context.createServer();
131
TransportClientFactory clientFactory = context.createClientFactory();
132
133
// Server with specific port and bootstraps
134
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(/* bootstrap implementations */);
135
TransportServer serverWithBootstraps = context.createServer(8080, serverBootstraps);
136
137
// Client factory with bootstraps
138
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(/* bootstrap implementations */);
139
TransportClientFactory factoryWithBootstraps = context.createClientFactory(clientBootstraps);
140
141
// Cleanup
142
context.close();
143
```
144
145
### TransportChannelHandler
146
147
Channel handler that processes both requests and responses in the Netty pipeline, managing the communication protocol between clients and servers.
148
149
```java { .api }
150
/**
151
* Channel handler that processes both requests and responses
152
*/
153
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
154
/**
155
* Get the client associated with this channel
156
* @return TransportClient instance
157
*/
158
public TransportClient getClient();
159
160
/**
161
* Get request timeout in milliseconds
162
* @return Timeout value
163
*/
164
public long getRequestTimeoutNs();
165
166
/**
167
* Add fetch request to track
168
* @param streamChunkId Stream chunk identifier
169
* @param callback Callback for chunk reception
170
*/
171
public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback);
172
173
/**
174
* Add RPC request to track
175
* @param requestId Request identifier
176
* @param callback Callback for RPC response
177
*/
178
public void addRpcRequest(long requestId, RpcResponseCallback callback);
179
180
/**
181
* Add stream callback
182
* @param streamId Stream identifier
183
* @param callback Stream callback
184
*/
185
public void addStreamCallback(String streamId, StreamCallback callback);
186
}
187
```
188
189
### Channel Configuration Types
190
191
```java { .api }
192
/**
193
* Bootstrap interface for setting up TransportClient instances
194
*/
195
public interface TransportClientBootstrap {
196
/**
197
* Perform client bootstrap setup
198
* @param client Transport client to configure
199
* @param channel Netty channel for the connection
200
* @throws RuntimeException if bootstrap fails
201
*/
202
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
203
}
204
205
/**
206
* Bootstrap interface for setting up server instances
207
*/
208
public interface TransportServerBootstrap {
209
/**
210
* Perform server bootstrap setup
211
* @param channel Netty channel for the connection
212
* @param rpcHandler RPC handler for the channel
213
* @return Configured RPC handler (may be wrapped or replaced)
214
*/
215
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
216
}
217
```
218
219
## Integration Notes
220
221
The transport layer integrates closely with:
222
223
- **Configuration**: Uses `TransportConf` for network settings like timeouts, thread pools, and buffer preferences
224
- **Security**: Supports bootstrap mechanisms for authentication (SASL, Spark auth) and encryption
225
- **Protocol**: Works with the message protocol layer for encoding/decoding network communications
226
- **Server Framework**: Provides the foundation for RPC handlers and stream managers
227
- **Utilities**: Leverages Netty utilities for channel creation, thread management, and memory allocation
228
229
The transport layer is designed to be the foundation that other Spark components build upon, providing reliable, configurable, and secure network communication capabilities.