or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-utilities.mdindex.mdmessage-protocol.mdsasl-authentication.mdserver-operations.mdtransport-setup.md

transport-setup.mddocs/

0

# Transport Setup

1

2

Core factory and setup functionality for creating transport clients and servers. TransportContext serves as the main entry point for all networking operations in Apache Spark.

3

4

## Capabilities

5

6

### TransportContext

7

8

Main factory class for creating transport clients and servers with Netty pipeline management.

9

10

```java { .api }

11

/**

12

* TransportContext manages the lifecycle and creation of transport clients and servers.

13

* It sets up Netty pipelines with proper handlers for the Spark networking protocol.

14

*

15

* @param conf Transport configuration settings

16

* @param rpcHandler Handler for processing RPC requests

17

* @param closeIdleConnections Whether to close idle connections automatically

18

*/

19

public class TransportContext {

20

public TransportContext(TransportConf conf, RpcHandler rpcHandler);

21

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);

22

23

/** Creates a client factory for managing outbound connections */

24

public TransportClientFactory createClientFactory();

25

26

/** Creates a client factory with custom bootstrap handlers */

27

public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);

28

29

/** Creates a server on any available port */

30

public TransportServer createServer();

31

32

/** Creates a server on the specified port */

33

public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);

34

35

/** Creates a server bound to specific host and port */

36

public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);

37

38

/** Creates a server with custom bootstrap handlers */

39

public TransportServer createServer(List<TransportServerBootstrap> bootstraps);

40

41

/** Initializes Netty pipeline for a channel */

42

public TransportChannelHandler initializePipeline(SocketChannel channel);

43

44

/** Initializes Netty pipeline with custom RPC handler */

45

public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);

46

47

/** Gets the transport configuration */

48

public TransportConf getConf();

49

}

50

```

51

52

**Basic Setup Example:**

53

54

```java

55

import org.apache.spark.network.TransportContext;

56

import org.apache.spark.network.server.NoOpRpcHandler;

57

import org.apache.spark.network.util.TransportConf;

58

import org.apache.spark.network.util.SystemPropertyConfigProvider;

59

60

// Create configuration from system properties

61

TransportConf conf = new TransportConf("spark.shuffle", new SystemPropertyConfigProvider());

62

63

// Create a no-op RPC handler for basic transport

64

RpcHandler handler = new NoOpRpcHandler();

65

66

// Create transport context

67

TransportContext context = new TransportContext(conf, handler);

68

69

// Now you can create clients and servers from this context

70

TransportServer server = context.createServer(8080, new ArrayList<>());

71

TransportClientFactory clientFactory = context.createClientFactory();

72

```

73

74

**Setup with Custom Configuration:**

75

76

```java

77

import org.apache.spark.network.util.MapConfigProvider;

78

import java.util.HashMap;

79

import java.util.Map;

80

81

// Create custom configuration

82

Map<String, String> configMap = new HashMap<>();

83

configMap.put("spark.shuffle.io.mode", "NIO");

84

configMap.put("spark.shuffle.io.preferDirectBufs", "true");

85

configMap.put("spark.shuffle.io.connectionTimeout", "120s");

86

87

TransportConf conf = new TransportConf("spark.shuffle", new MapConfigProvider(configMap));

88

TransportContext context = new TransportContext(conf, handler);

89

```

90

91

**Setup with Bootstrap Handlers:**

92

93

```java

94

import org.apache.spark.network.sasl.SaslServerBootstrap;

95

import org.apache.spark.network.sasl.SaslClientBootstrap;

96

import java.util.Arrays;

97

98

// Create SASL authentication bootstraps

99

SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);

100

SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, appId, secretKeyHolder);

101

102

// Create server with SASL authentication

103

TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));

104

105

// Create client factory with SASL authentication

106

TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));

107

```

108

109

### Bootstrap Interfaces

110

111

Interfaces for customizing client and server channel initialization.

112

113

```java { .api }

114

/**

115

* Interface for customizing client channel initialization.

116

* Implementations can add custom handlers to the Netty pipeline.

117

*/

118

public interface TransportClientBootstrap {

119

/**

120

* Customizes the client channel after it's created but before it's used.

121

*

122

* @param client The transport client instance

123

* @param channel The Netty channel to customize

124

*/

125

void doBootstrap(TransportClient client, Channel channel);

126

}

127

128

/**

129

* Interface for customizing server channel initialization.

130

* Implementations can modify the RPC handler or add custom pipeline handlers.

131

*/

132

public interface TransportServerBootstrap {

133

/**

134

* Customizes the server channel and potentially wraps the RPC handler.

135

*

136

* @param channel The Netty channel to customize

137

* @param rpcHandler The current RPC handler

138

* @return The RPC handler to use (may be the original or a wrapper)

139

*/

140

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

141

}

142

```

143

144

### Configuration Integration

145

146

The transport setup integrates closely with the configuration system to control networking behavior.

147

148

**Key Configuration Properties:**

149

150

- `io.mode`: Network IO mode (NIO or EPOLL)

151

- `io.preferDirectBufs`: Whether to use direct ByteBuffers

152

- `io.connectionTimeout`: Connection timeout in milliseconds

153

- `io.numConnectionsPerPeer`: Maximum connections per remote peer

154

- `serverThreads`: Number of server worker threads

155

- `clientThreads`: Number of client worker threads

156

157

**Example with Custom Configuration:**

158

159

```java

160

// Configure for high-throughput scenarios

161

Map<String, String> config = new HashMap<>();

162

config.put("spark.network.io.mode", "EPOLL"); // Use EPOLL on Linux

163

config.put("spark.network.io.preferDirectBufs", "true"); // Use direct buffers

164

config.put("spark.network.io.numConnectionsPerPeer", "5"); // More connections per peer

165

config.put("spark.network.serverThreads", "8"); // More server threads

166

167

TransportConf conf = new TransportConf("spark.network", new MapConfigProvider(config));

168

TransportContext context = new TransportContext(conf, rpcHandler, true); // Close idle connections

169

```

170

171

### Error Handling

172

173

Transport setup operations can throw various exceptions that should be handled appropriately.

174

175

**Common Setup Exceptions:**

176

177

```java

178

try {

179

TransportServer server = context.createServer("localhost", 8080, bootstraps);

180

System.out.println("Server started on port: " + server.getPort());

181

} catch (Exception e) {

182

// Handle server creation failure (port in use, binding issues, etc.)

183

System.err.println("Failed to create server: " + e.getMessage());

184

}

185

186

try {

187

TransportClient client = clientFactory.createClient("remote-host", 9090);

188

System.out.println("Connected to: " + client.getSocketAddress());

189

} catch (Exception e) {

190

// Handle connection failure (host unreachable, connection refused, etc.)

191

System.err.println("Failed to connect: " + e.getMessage());

192

}

193

```