0
# Transport Layer
1
2
Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and comprehensive resource management for Apache Spark's distributed architecture.
3
4
## Capabilities
5
6
### Transport Context
7
8
Central factory for creating transport servers and client factories with consistent configuration and RPC handler setup.
9
10
```java { .api }
11
/**
12
* Central context for creating transport servers, client factories, and Netty channel pipelines
13
*/
14
public class TransportContext {
15
/**
16
* Create a new TransportContext with the given configuration and RPC handler
17
* @param conf Transport configuration settings
18
* @param rpcHandler Handler for processing RPC messages
19
*/
20
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
21
22
/**
23
* Create a new TransportContext with connection idle timeout control
24
* @param conf Transport configuration settings
25
* @param rpcHandler Handler for processing RPC messages
26
* @param closeIdleConnections Whether to close idle connections automatically
27
*/
28
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
29
30
/**
31
* Create a client factory with custom bootstrap configurations
32
* @param bootstraps List of client bootstrap configurations for channel setup
33
* @return TransportClientFactory for creating clients
34
*/
35
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
36
37
/**
38
* Create a client factory with default configuration
39
* @return TransportClientFactory for creating clients
40
*/
41
public TransportClientFactory createClientFactory();
42
43
/**
44
* Create a server bound to specific port with custom bootstrap configurations
45
* @param port Port to bind server to
46
* @param bootstraps List of server bootstrap configurations
47
* @return TransportServer instance
48
*/
49
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
50
51
/**
52
* Create a server bound to specific host and port with custom bootstrap configurations
53
* @param host Host address to bind server to
54
* @param port Port to bind server to
55
* @param bootstraps List of server bootstrap configurations
56
* @return TransportServer instance
57
*/
58
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
59
60
/**
61
* Create a server with custom bootstrap configurations on any available port
62
* @param bootstraps List of server bootstrap configurations
63
* @return TransportServer instance
64
*/
65
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
66
67
/**
68
* Create a server with default configuration on any available port
69
* @return TransportServer instance
70
*/
71
public TransportServer createServer();
72
73
/**
74
* Initialize a Netty channel pipeline with transport handlers
75
* @param channel Socket channel to initialize
76
* @return TransportChannelHandler for the channel
77
*/
78
public TransportChannelHandler initializePipeline(SocketChannel channel);
79
80
/**
81
* Initialize a Netty channel pipeline with custom RPC handler
82
* @param channel Socket channel to initialize
83
* @param channelRpcHandler Custom RPC handler for this channel
84
* @return TransportChannelHandler for the channel
85
*/
86
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);
87
88
/**
89
* Get the transport configuration
90
* @return TransportConf instance
91
*/
92
public TransportConf getConf();
93
}
94
```
95
96
**Usage Examples:**
97
98
```java
99
import org.apache.spark.network.TransportContext;
100
import org.apache.spark.network.server.RpcHandler;
101
import org.apache.spark.network.util.TransportConf;
102
103
// Basic setup
104
TransportConf conf = new TransportConf("myapp", configProvider);
105
RpcHandler rpcHandler = new MyRpcHandler();
106
TransportContext context = new TransportContext(conf, rpcHandler);
107
108
// Create server
109
TransportServer server = context.createServer(8080);
110
111
// Create client factory
112
TransportClientFactory factory = context.createClientFactory();
113
114
// With authentication
115
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
116
new SaslServerBootstrap(conf, secretKeyHolder)
117
);
118
TransportServer authServer = context.createServer(8081, serverBootstraps);
119
```
120
121
### Transport Client
122
123
Thread-safe client for fetching consecutive chunks of pre-negotiated streams and sending RPCs with comprehensive callback support.
124
125
```java { .api }
126
/**
127
* Client for fetching consecutive chunks of pre-negotiated streams and sending RPCs
128
* Thread-safe and supports concurrent operations
129
*/
130
public class TransportClient {
131
/**
132
* Create a new TransportClient with the given channel and response handler
133
* @param channel Netty channel for communication
134
* @param handler Response handler for processing server responses
135
*/
136
public TransportClient(Channel channel, TransportResponseHandler handler);
137
138
/**
139
* Get the underlying Netty channel
140
* @return Channel instance
141
*/
142
public Channel getChannel();
143
144
/**
145
* Check if the client connection is active
146
* @return true if connection is active, false otherwise
147
*/
148
public boolean isActive();
149
150
/**
151
* Get the remote socket address
152
* @return SocketAddress of remote peer
153
*/
154
public SocketAddress getSocketAddress();
155
156
/**
157
* Get the authenticated client ID when authentication is enabled
158
* @return String client ID or null if not authenticated
159
*/
160
public String getClientId();
161
162
/**
163
* Set the authenticated client ID
164
* @param id Client ID to set
165
*/
166
public void setClientId(String id);
167
168
/**
169
* Fetch a specific chunk from a stream asynchronously
170
* @param streamId ID of the stream to fetch from
171
* @param chunkIndex Index of the chunk to fetch
172
* @param callback Callback to handle chunk reception or failure
173
*/
174
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
175
176
/**
177
* Request to stream data with given stream ID
178
* @param streamId ID of the stream to request
179
* @param callback Callback to handle streaming data
180
*/
181
public void stream(String streamId, StreamCallback callback);
182
183
/**
184
* Send an RPC message asynchronously
185
* @param message Message payload as ByteBuffer
186
* @param callback Callback to handle RPC response or failure
187
* @return Request ID for tracking
188
*/
189
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
190
191
/**
192
* Upload streaming data with metadata
193
* @param meta Metadata for the upload
194
* @param data Data to upload
195
* @param callback Callback to handle upload response
196
* @return Request ID for tracking
197
*/
198
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
199
200
/**
201
* Send an RPC message synchronously with timeout
202
* @param message Message payload as ByteBuffer
203
* @param timeoutMs Timeout in milliseconds
204
* @return Response ByteBuffer
205
* @throws RuntimeException if timeout or other error occurs
206
*/
207
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
208
209
/**
210
* Send a one-way message that expects no response
211
* @param message Message payload as ByteBuffer
212
*/
213
public void send(ByteBuffer message);
214
215
/**
216
* Remove a pending RPC request
217
* @param requestId ID of the request to remove
218
*/
219
public void removeRpcRequest(long requestId);
220
221
/**
222
* Mark the channel as timed out, preventing further requests
223
*/
224
public void timeOut();
225
226
/**
227
* Close the client connection and clean up resources
228
*/
229
public void close();
230
}
231
```
232
233
**Usage Examples:**
234
235
```java
236
// Async RPC
237
ByteBuffer request = ByteBuffer.wrap("hello".getBytes());
238
client.sendRpc(request, new RpcResponseCallback() {
239
@Override
240
public void onSuccess(ByteBuffer response) {
241
System.out.println("Response: " + new String(response.array()));
242
}
243
244
@Override
245
public void onFailure(Throwable e) {
246
System.err.println("RPC failed: " + e.getMessage());
247
}
248
});
249
250
// Sync RPC with timeout
251
try {
252
ByteBuffer response = client.sendRpcSync(request, 5000); // 5 second timeout
253
System.out.println("Sync response: " + new String(response.array()));
254
} catch (RuntimeException e) {
255
System.err.println("Sync RPC failed: " + e.getMessage());
256
}
257
258
// Chunk fetching
259
client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {
260
@Override
261
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
262
// Process chunk data
263
System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
264
}
265
266
@Override
267
public void onFailure(int chunkIndex, Throwable e) {
268
System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
269
}
270
});
271
272
// One-way message
273
ByteBuffer notification = ByteBuffer.wrap("status_update".getBytes());
274
client.send(notification);
275
```
276
277
### Transport Client Factory
278
279
Factory for creating TransportClient instances with connection pooling, retry logic, and resource management.
280
281
```java { .api }
282
/**
283
* Factory for creating TransportClient instances with connection pooling
284
*/
285
public class TransportClientFactory {
286
/**
287
* Create a new TransportClientFactory
288
* @param context Transport context for configuration
289
* @param clientBootstraps List of client bootstrap configurations
290
*/
291
public TransportClientFactory(TransportContext context, List<TransportClientBootstrap> clientBootstraps);
292
293
/**
294
* Get all metrics for monitoring connection pool and performance
295
* @return MetricSet containing all factory metrics
296
*/
297
public MetricSet getAllMetrics();
298
299
/**
300
* Create a pooled client connection to the specified remote host and port
301
* @param remoteHost Hostname or IP address to connect to
302
* @param remotePort Port number to connect to
303
* @return TransportClient instance from connection pool
304
* @throws IOException if connection fails
305
* @throws InterruptedException if connection is interrupted
306
*/
307
public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException;
308
309
/**
310
* Create an unmanaged client connection (not pooled)
311
* @param remoteHost Hostname or IP address to connect to
312
* @param remotePort Port number to connect to
313
* @return TransportClient instance not managed by pool
314
* @throws IOException if connection fails
315
* @throws InterruptedException if connection is interrupted
316
*/
317
public TransportClient createUnmanagedClient(String remoteHost, int remotePort) throws IOException, InterruptedException;
318
319
/**
320
* Close the factory and all pooled connections
321
*/
322
public void close();
323
}
324
```
325
326
**Usage Examples:**
327
328
```java
329
// Create factory with auth bootstrap
330
List<TransportClientBootstrap> bootstraps = Arrays.asList(
331
new SaslClientBootstrap(conf, appId, secretKeyHolder)
332
);
333
TransportClientFactory factory = new TransportClientFactory(context, bootstraps);
334
335
// Create pooled client (recommended for most use cases)
336
TransportClient client = factory.createClient("spark-worker-1", 7337);
337
338
// Create unmanaged client (for special cases)
339
TransportClient unmanagedClient = factory.createUnmanagedClient("spark-worker-2", 7337);
340
341
// Monitor connection pool
342
MetricSet metrics = factory.getAllMetrics();
343
344
// Cleanup
345
client.close();
346
unmanagedClient.close();
347
factory.close();
348
```
349
350
### Transport Server
351
352
Server for handling incoming connections and requests with pluggable RPC handlers and bootstrap configurations.
353
354
```java { .api }
355
/**
356
* Server for handling incoming connections and requests
357
*/
358
public class TransportServer {
359
/**
360
* Create a new TransportServer
361
* @param context Transport context with configuration and RPC handler
362
* @param hostToBind Host address to bind server to (null for all interfaces)
363
* @param portToBind Port to bind server to (0 for any available port)
364
* @param appRpcHandler RPC handler for processing application messages
365
* @param bootstraps List of server bootstrap configurations
366
*/
367
public TransportServer(TransportContext context, String hostToBind, int portToBind,
368
RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps);
369
370
/**
371
* Get the port the server is listening on
372
* @return Port number
373
*/
374
public int getPort();
375
376
/**
377
* Shut down the server and clean up resources
378
*/
379
public void close();
380
}
381
```
382
383
**Usage Examples:**
384
385
```java
386
// Basic server
387
TransportServer server = new TransportServer(context, null, 0, rpcHandler, Collections.emptyList());
388
int port = server.getPort();
389
System.out.println("Server listening on port: " + port);
390
391
// Server with authentication
392
List<TransportServerBootstrap> bootstraps = Arrays.asList(
393
new SaslServerBootstrap(conf, secretKeyHolder)
394
);
395
TransportServer authServer = new TransportServer(context, "localhost", 8080, rpcHandler, bootstraps);
396
397
// Cleanup
398
server.close();
399
authServer.close();
400
```
401
402
### Bootstrap Interfaces
403
404
Interfaces for customizing client and server channel initialization, supporting authentication, encryption, and other channel setup requirements.
405
406
```java { .api }
407
/**
408
* Bootstrap interface for client-side channel initialization
409
*/
410
public interface TransportClientBootstrap {
411
/**
412
* Initialize a client channel with custom configuration
413
* @param client The transport client instance
414
* @param channel The Netty channel to bootstrap
415
* @throws RuntimeException if bootstrap fails
416
*/
417
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
418
}
419
420
/**
421
* Bootstrap interface for server-side channel initialization
422
*/
423
public interface TransportServerBootstrap {
424
/**
425
* Initialize a server channel and return the RPC handler to use
426
* @param channel The Netty channel to bootstrap
427
* @param rpcHandler The default RPC handler
428
* @return RPC handler to use for this channel (may be wrapped or replaced)
429
*/
430
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
431
}
432
```
433
434
## Channel Handler
435
436
Low-level Netty channel handler for the transport protocol, typically used internally by the transport layer.
437
438
```java { .api }
439
/**
440
* Netty channel handler for transport protocol
441
*/
442
public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
443
/**
444
* Create a new TransportChannelHandler
445
* @param client Transport client for this channel
446
* @param requestHandler Handler for processing requests
447
* @param closeIdleConnections Whether to close idle connections
448
* @param streamInterceptor Optional interceptor for stream frames
449
*/
450
public TransportChannelHandler(TransportClient client, TransportRequestHandler requestHandler,
451
boolean closeIdleConnections, TransportFrameDecoder.Interceptor streamInterceptor);
452
453
/**
454
* Get the transport client for this channel
455
* @return TransportClient instance
456
*/
457
public TransportClient getClient();
458
}
459
```