0
# Transport Setup
1
2
Core factory and setup functionality for creating transport clients and servers. TransportContext serves as the main entry point for all networking operations in Apache Spark.
3
4
## Capabilities
5
6
### TransportContext
7
8
Main factory class for creating transport clients and servers with Netty pipeline management.
9
10
```java { .api }
11
/**
12
* TransportContext manages the lifecycle and creation of transport clients and servers.
13
* It sets up Netty pipelines with proper handlers for the Spark networking protocol.
14
*
15
* @param conf Transport configuration settings
16
* @param rpcHandler Handler for processing RPC requests
17
* @param closeIdleConnections Whether to close idle connections automatically
18
*/
19
public class TransportContext {
20
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
21
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
22
23
/** Creates a client factory for managing outbound connections */
24
public TransportClientFactory createClientFactory();
25
26
/** Creates a client factory with custom bootstrap handlers */
27
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
28
29
/** Creates a server on any available port */
30
public TransportServer createServer();
31
32
/** Creates a server on the specified port */
33
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
34
35
/** Creates a server bound to specific host and port */
36
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
37
38
/** Creates a server with custom bootstrap handlers */
39
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
40
41
/** Initializes Netty pipeline for a channel */
42
public TransportChannelHandler initializePipeline(SocketChannel channel);
43
44
/** Initializes Netty pipeline with custom RPC handler */
45
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);
46
47
/** Gets the transport configuration */
48
public TransportConf getConf();
49
}
50
```
51
52
**Basic Setup Example:**
53
54
```java
55
import org.apache.spark.network.TransportContext;
56
import org.apache.spark.network.server.NoOpRpcHandler;
57
import org.apache.spark.network.util.TransportConf;
58
import org.apache.spark.network.util.SystemPropertyConfigProvider;
59
60
// Create configuration from system properties
61
TransportConf conf = new TransportConf("spark.shuffle", new SystemPropertyConfigProvider());
62
63
// Create a no-op RPC handler for basic transport
64
RpcHandler handler = new NoOpRpcHandler();
65
66
// Create transport context
67
TransportContext context = new TransportContext(conf, handler);
68
69
// Now you can create clients and servers from this context
70
TransportServer server = context.createServer(8080, new ArrayList<>());
71
TransportClientFactory clientFactory = context.createClientFactory();
72
```
73
74
**Setup with Custom Configuration:**
75
76
```java
77
import org.apache.spark.network.util.MapConfigProvider;
78
import java.util.HashMap;
79
import java.util.Map;
80
81
// Create custom configuration
82
Map<String, String> configMap = new HashMap<>();
83
configMap.put("spark.shuffle.io.mode", "NIO");
84
configMap.put("spark.shuffle.io.preferDirectBufs", "true");
85
configMap.put("spark.shuffle.io.connectionTimeout", "120s");
86
87
TransportConf conf = new TransportConf("spark.shuffle", new MapConfigProvider(configMap));
88
TransportContext context = new TransportContext(conf, handler);
89
```
90
91
**Setup with Bootstrap Handlers:**
92
93
```java
94
import org.apache.spark.network.sasl.SaslServerBootstrap;
95
import org.apache.spark.network.sasl.SaslClientBootstrap;
96
import java.util.Arrays;
97
98
// Create SASL authentication bootstraps
99
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
100
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, appId, secretKeyHolder);
101
102
// Create server with SASL authentication
103
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
104
105
// Create client factory with SASL authentication
106
TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
107
```
108
109
### Bootstrap Interfaces
110
111
Interfaces for customizing client and server channel initialization.
112
113
```java { .api }
114
/**
115
* Interface for customizing client channel initialization.
116
* Implementations can add custom handlers to the Netty pipeline.
117
*/
118
public interface TransportClientBootstrap {
119
/**
120
* Customizes the client channel after it's created but before it's used.
121
*
122
* @param client The transport client instance
123
* @param channel The Netty channel to customize
124
*/
125
void doBootstrap(TransportClient client, Channel channel);
126
}
127
128
/**
129
* Interface for customizing server channel initialization.
130
* Implementations can modify the RPC handler or add custom pipeline handlers.
131
*/
132
public interface TransportServerBootstrap {
133
/**
134
* Customizes the server channel and potentially wraps the RPC handler.
135
*
136
* @param channel The Netty channel to customize
137
* @param rpcHandler The current RPC handler
138
* @return The RPC handler to use (may be the original or a wrapper)
139
*/
140
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
141
}
142
```
143
144
### Configuration Integration
145
146
The transport setup integrates closely with the configuration system to control networking behavior.
147
148
**Key Configuration Properties:**
149
150
- `io.mode`: Network IO mode (NIO or EPOLL)
151
- `io.preferDirectBufs`: Whether to use direct ByteBuffers
152
- `io.connectionTimeout`: Connection timeout in milliseconds
153
- `io.numConnectionsPerPeer`: Maximum connections per remote peer
154
- `serverThreads`: Number of server worker threads
155
- `clientThreads`: Number of client worker threads
156
157
**Example with Custom Configuration:**
158
159
```java
160
// Configure for high-throughput scenarios
161
Map<String, String> config = new HashMap<>();
162
config.put("spark.network.io.mode", "EPOLL"); // Use EPOLL on Linux
163
config.put("spark.network.io.preferDirectBufs", "true"); // Use direct buffers
164
config.put("spark.network.io.numConnectionsPerPeer", "5"); // More connections per peer
165
config.put("spark.network.serverThreads", "8"); // More server threads
166
167
TransportConf conf = new TransportConf("spark.network", new MapConfigProvider(config));
168
TransportContext context = new TransportContext(conf, rpcHandler, true); // Close idle connections
169
```
170
171
### Error Handling
172
173
Transport setup operations can throw various exceptions that should be handled appropriately.
174
175
**Common Setup Exceptions:**
176
177
```java
178
try {
179
TransportServer server = context.createServer("localhost", 8080, bootstraps);
180
System.out.println("Server started on port: " + server.getPort());
181
} catch (Exception e) {
182
// Handle server creation failure (port in use, binding issues, etc.)
183
System.err.println("Failed to create server: " + e.getMessage());
184
}
185
186
try {
187
TransportClient client = clientFactory.createClient("remote-host", 9090);
188
System.out.println("Connected to: " + client.getSocketAddress());
189
} catch (Exception e) {
190
// Handle connection failure (host unreachable, connection refused, etc.)
191
System.err.println("Failed to connect: " + e.getMessage());
192
}
193
```