or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdbuffer-management.mdclient-communications.mdconfiguration-utilities.mdindex.mdprotocol-messages.mdserver-framework.mdtransport-layer.md

transport-layer.mddocs/

0

# Transport Layer

1

2

The transport layer provides the core networking infrastructure for Apache Spark's distributed communication. It centers around `TransportContext` which manages client and server instances, handles configuration, and sets up Netty channel pipelines.

3

4

## Capabilities

5

6

### TransportContext

7

8

Central context class that manages the lifecycle of transport clients and servers, handles configuration, and provides factory methods for creating network components.

9

10

```java { .api }

11

/**

12

* Contains context to create TransportServer, TransportClientFactory, and to setup

13

* Netty Channel pipelines. Handles both control-plane RPCs and data-plane chunk fetching.

14

*/

15

public class TransportContext implements Closeable {

16

/**

17

* Create a TransportContext with default settings

18

* @param conf Transport configuration

19

* @param rpcHandler Handler for RPC messages

20

*/

21

public TransportContext(TransportConf conf, RpcHandler rpcHandler);

22

23

/**

24

* Create a TransportContext with idle connection control

25

* @param conf Transport configuration

26

* @param rpcHandler Handler for RPC messages

27

* @param closeIdleConnections Whether to close idle connections

28

*/

29

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);

30

31

/**

32

* Create a TransportContext with full configuration options

33

* @param conf Transport configuration

34

* @param rpcHandler Handler for RPC messages

35

* @param closeIdleConnections Whether to close idle connections

36

* @param isClientOnly Whether this context is client-only

37

*/

38

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);

39

40

/**

41

* Create a client factory for establishing connections

42

* @return TransportClientFactory instance

43

*/

44

public TransportClientFactory createClientFactory();

45

46

/**

47

* Create a client factory with bootstrap configurations

48

* @param bootstraps List of client bootstrap configurations

49

* @return TransportClientFactory instance

50

*/

51

public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);

52

53

/**

54

* Create a server on a random available port

55

* @return TransportServer instance

56

*/

57

public TransportServer createServer();

58

59

/**

60

* Create a server with bootstrap configurations

61

* @param bootstraps List of server bootstrap configurations

62

* @return TransportServer instance

63

*/

64

public TransportServer createServer(List<TransportServerBootstrap> bootstraps);

65

66

/**

67

* Create a server on a specific port

68

* @param port Port number to bind to

69

* @param bootstraps List of server bootstrap configurations

70

* @return TransportServer instance

71

*/

72

public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);

73

74

/**

75

* Create a server on a specific host and port

76

* @param host Host to bind to

77

* @param port Port number to bind to

78

* @param bootstraps List of server bootstrap configurations

79

* @return TransportServer instance

80

*/

81

public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);

82

83

/**

84

* Initialize Netty channel pipeline for a socket channel

85

* @param channel Socket channel to initialize

86

* @return TransportChannelHandler for the pipeline

87

*/

88

public TransportChannelHandler initializePipeline(SocketChannel channel);

89

90

/**

91

* Initialize Netty channel pipeline with custom RPC handler

92

* @param channel Socket channel to initialize

93

* @param channelRpcHandler Custom RPC handler for this channel

94

* @return TransportChannelHandler for the pipeline

95

*/

96

public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);

97

98

/**

99

* Get the transport configuration

100

* @return TransportConf instance

101

*/

102

public TransportConf getConf();

103

104

/**

105

* Get counter for registered connections

106

* @return Counter for monitoring connections

107

*/

108

public Counter getRegisteredConnections();

109

110

/**

111

* Close the context and clean up resources

112

*/

113

public void close();

114

}

115

```

116

117

**Usage Examples:**

118

119

```java

120

import org.apache.spark.network.TransportContext;

121

import org.apache.spark.network.server.NoOpRpcHandler;

122

import org.apache.spark.network.util.MapConfigProvider;

123

import org.apache.spark.network.util.TransportConf;

124

125

// Basic context setup

126

TransportConf conf = new TransportConf("myapp", new MapConfigProvider(Collections.emptyMap()));

127

TransportContext context = new TransportContext(conf, new NoOpRpcHandler());

128

129

// Create server and client factory

130

TransportServer server = context.createServer();

131

TransportClientFactory clientFactory = context.createClientFactory();

132

133

// Server with specific port and bootstraps

134

List<TransportServerBootstrap> serverBootstraps = Arrays.asList(/* bootstrap implementations */);

135

TransportServer serverWithBootstraps = context.createServer(8080, serverBootstraps);

136

137

// Client factory with bootstraps

138

List<TransportClientBootstrap> clientBootstraps = Arrays.asList(/* bootstrap implementations */);

139

TransportClientFactory factoryWithBootstraps = context.createClientFactory(clientBootstraps);

140

141

// Cleanup

142

context.close();

143

```

144

145

### TransportChannelHandler

146

147

Channel handler that processes both requests and responses in the Netty pipeline, managing the communication protocol between clients and servers.

148

149

```java { .api }

150

/**

151

* Channel handler that processes both requests and responses

152

*/

153

public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {

154

/**

155

* Get the client associated with this channel

156

* @return TransportClient instance

157

*/

158

public TransportClient getClient();

159

160

/**

161

* Get request timeout in milliseconds

162

* @return Timeout value

163

*/

164

public long getRequestTimeoutNs();

165

166

/**

167

* Add fetch request to track

168

* @param streamChunkId Stream chunk identifier

169

* @param callback Callback for chunk reception

170

*/

171

public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback);

172

173

/**

174

* Add RPC request to track

175

* @param requestId Request identifier

176

* @param callback Callback for RPC response

177

*/

178

public void addRpcRequest(long requestId, RpcResponseCallback callback);

179

180

/**

181

* Add stream callback

182

* @param streamId Stream identifier

183

* @param callback Stream callback

184

*/

185

public void addStreamCallback(String streamId, StreamCallback callback);

186

}

187

```

188

189

### Channel Configuration Types

190

191

```java { .api }

192

/**

193

* Bootstrap interface for setting up TransportClient instances

194

*/

195

public interface TransportClientBootstrap {

196

/**

197

* Perform client bootstrap setup

198

* @param client Transport client to configure

199

* @param channel Netty channel for the connection

200

* @throws RuntimeException if bootstrap fails

201

*/

202

void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

203

}

204

205

/**

206

* Bootstrap interface for setting up server instances

207

*/

208

public interface TransportServerBootstrap {

209

/**

210

* Perform server bootstrap setup

211

* @param channel Netty channel for the connection

212

* @param rpcHandler RPC handler for the channel

213

* @return Configured RPC handler (may be wrapped or replaced)

214

*/

215

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

216

}

217

```

218

219

## Integration Notes

220

221

The transport layer integrates closely with:

222

223

- **Configuration**: Uses `TransportConf` for network settings like timeouts, thread pools, and buffer preferences

224

- **Security**: Supports bootstrap mechanisms for authentication (SASL, Spark auth) and encryption

225

- **Protocol**: Works with the message protocol layer for encoding/decoding network communications

226

- **Server Framework**: Provides the foundation for RPC handlers and stream managers

227

- **Utilities**: Leverages Netty utilities for channel creation, thread management, and memory allocation

228

229

The transport layer is designed to be the foundation that other Spark components build upon, providing reliable, configurable, and secure network communication capabilities.