or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdbuffers.mdconfiguration.mdindex.mdprotocol.mdstreaming.mdtransport.md

transport.mddocs/

0

# Transport Layer

1

2

Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and comprehensive resource management for Apache Spark's distributed architecture.

3

4

## Capabilities

5

6

### Transport Context

7

8

Central factory for creating transport servers and client factories with consistent configuration and RPC handler setup.

9

10

```java { .api }

11

/**

12

* Central context for creating transport servers, client factories, and Netty channel pipelines

13

*/

14

public class TransportContext {

15

/**

16

* Create a new TransportContext with the given configuration and RPC handler

17

* @param conf Transport configuration settings

18

* @param rpcHandler Handler for processing RPC messages

19

*/

20

public TransportContext(TransportConf conf, RpcHandler rpcHandler);

21

22

/**

23

* Create a new TransportContext with connection idle timeout control

24

* @param conf Transport configuration settings

25

* @param rpcHandler Handler for processing RPC messages

26

* @param closeIdleConnections Whether to close idle connections automatically

27

*/

28

public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);

29

30

/**

31

* Create a client factory with custom bootstrap configurations

32

* @param bootstraps List of client bootstrap configurations for channel setup

33

* @return TransportClientFactory for creating clients

34

*/

35

public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);

36

37

/**

38

* Create a client factory with default configuration

39

* @return TransportClientFactory for creating clients

40

*/

41

public TransportClientFactory createClientFactory();

42

43

/**

44

* Create a server bound to specific port with custom bootstrap configurations

45

* @param port Port to bind server to

46

* @param bootstraps List of server bootstrap configurations

47

* @return TransportServer instance

48

*/

49

public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);

50

51

/**

52

* Create a server bound to specific host and port with custom bootstrap configurations

53

* @param host Host address to bind server to

54

* @param port Port to bind server to

55

* @param bootstraps List of server bootstrap configurations

56

* @return TransportServer instance

57

*/

58

public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);

59

60

/**

61

* Create a server with custom bootstrap configurations on any available port

62

* @param bootstraps List of server bootstrap configurations

63

* @return TransportServer instance

64

*/

65

public TransportServer createServer(List<TransportServerBootstrap> bootstraps);

66

67

/**

68

* Create a server with default configuration on any available port

69

* @return TransportServer instance

70

*/

71

public TransportServer createServer();

72

73

/**

74

* Initialize a Netty channel pipeline with transport handlers

75

* @param channel Socket channel to initialize

76

* @return TransportChannelHandler for the channel

77

*/

78

public TransportChannelHandler initializePipeline(SocketChannel channel);

79

80

/**

81

* Initialize a Netty channel pipeline with custom RPC handler

82

* @param channel Socket channel to initialize

83

* @param channelRpcHandler Custom RPC handler for this channel

84

* @return TransportChannelHandler for the channel

85

*/

86

public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);

87

88

/**

89

* Get the transport configuration

90

* @return TransportConf instance

91

*/

92

public TransportConf getConf();

93

}

94

```

95

96

**Usage Examples:**

97

98

```java

99

import org.apache.spark.network.TransportContext;

100

import org.apache.spark.network.server.RpcHandler;

101

import org.apache.spark.network.util.TransportConf;

102

103

// Basic setup

104

TransportConf conf = new TransportConf("myapp", configProvider);

105

RpcHandler rpcHandler = new MyRpcHandler();

106

TransportContext context = new TransportContext(conf, rpcHandler);

107

108

// Create server

109

TransportServer server = context.createServer(8080);

110

111

// Create client factory

112

TransportClientFactory factory = context.createClientFactory();

113

114

// With authentication

115

List<TransportServerBootstrap> serverBootstraps = Arrays.asList(

116

new SaslServerBootstrap(conf, secretKeyHolder)

117

);

118

TransportServer authServer = context.createServer(8081, serverBootstraps);

119

```

120

121

### Transport Client

122

123

Thread-safe client for fetching consecutive chunks of pre-negotiated streams and sending RPCs with comprehensive callback support.

124

125

```java { .api }

126

/**

127

* Client for fetching consecutive chunks of pre-negotiated streams and sending RPCs

128

* Thread-safe and supports concurrent operations

129

*/

130

public class TransportClient {

131

/**

132

* Create a new TransportClient with the given channel and response handler

133

* @param channel Netty channel for communication

134

* @param handler Response handler for processing server responses

135

*/

136

public TransportClient(Channel channel, TransportResponseHandler handler);

137

138

/**

139

* Get the underlying Netty channel

140

* @return Channel instance

141

*/

142

public Channel getChannel();

143

144

/**

145

* Check if the client connection is active

146

* @return true if connection is active, false otherwise

147

*/

148

public boolean isActive();

149

150

/**

151

* Get the remote socket address

152

* @return SocketAddress of remote peer

153

*/

154

public SocketAddress getSocketAddress();

155

156

/**

157

* Get the authenticated client ID when authentication is enabled

158

* @return String client ID or null if not authenticated

159

*/

160

public String getClientId();

161

162

/**

163

* Set the authenticated client ID

164

* @param id Client ID to set

165

*/

166

public void setClientId(String id);

167

168

/**

169

* Fetch a specific chunk from a stream asynchronously

170

* @param streamId ID of the stream to fetch from

171

* @param chunkIndex Index of the chunk to fetch

172

* @param callback Callback to handle chunk reception or failure

173

*/

174

public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);

175

176

/**

177

* Request to stream data with given stream ID

178

* @param streamId ID of the stream to request

179

* @param callback Callback to handle streaming data

180

*/

181

public void stream(String streamId, StreamCallback callback);

182

183

/**

184

* Send an RPC message asynchronously

185

* @param message Message payload as ByteBuffer

186

* @param callback Callback to handle RPC response or failure

187

* @return Request ID for tracking

188

*/

189

public long sendRpc(ByteBuffer message, RpcResponseCallback callback);

190

191

/**

192

* Upload streaming data with metadata

193

* @param meta Metadata for the upload

194

* @param data Data to upload

195

* @param callback Callback to handle upload response

196

* @return Request ID for tracking

197

*/

198

public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);

199

200

/**

201

* Send an RPC message synchronously with timeout

202

* @param message Message payload as ByteBuffer

203

* @param timeoutMs Timeout in milliseconds

204

* @return Response ByteBuffer

205

* @throws RuntimeException if timeout or other error occurs

206

*/

207

public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);

208

209

/**

210

* Send a one-way message that expects no response

211

* @param message Message payload as ByteBuffer

212

*/

213

public void send(ByteBuffer message);

214

215

/**

216

* Remove a pending RPC request

217

* @param requestId ID of the request to remove

218

*/

219

public void removeRpcRequest(long requestId);

220

221

/**

222

* Mark the channel as timed out, preventing further requests

223

*/

224

public void timeOut();

225

226

/**

227

* Close the client connection and clean up resources

228

*/

229

public void close();

230

}

231

```

232

233

**Usage Examples:**

234

235

```java

236

// Async RPC

237

ByteBuffer request = ByteBuffer.wrap("hello".getBytes());

238

client.sendRpc(request, new RpcResponseCallback() {

239

@Override

240

public void onSuccess(ByteBuffer response) {

241

System.out.println("Response: " + new String(response.array()));

242

}

243

244

@Override

245

public void onFailure(Throwable e) {

246

System.err.println("RPC failed: " + e.getMessage());

247

}

248

});

249

250

// Sync RPC with timeout

251

try {

252

ByteBuffer response = client.sendRpcSync(request, 5000); // 5 second timeout

253

System.out.println("Sync response: " + new String(response.array()));

254

} catch (RuntimeException e) {

255

System.err.println("Sync RPC failed: " + e.getMessage());

256

}

257

258

// Chunk fetching

259

client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {

260

@Override

261

public void onSuccess(int chunkIndex, ManagedBuffer buffer) {

262

// Process chunk data

263

System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());

264

}

265

266

@Override

267

public void onFailure(int chunkIndex, Throwable e) {

268

System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());

269

}

270

});

271

272

// One-way message

273

ByteBuffer notification = ByteBuffer.wrap("status_update".getBytes());

274

client.send(notification);

275

```

276

277

### Transport Client Factory

278

279

Factory for creating TransportClient instances with connection pooling, retry logic, and resource management.

280

281

```java { .api }

282

/**

283

* Factory for creating TransportClient instances with connection pooling

284

*/

285

public class TransportClientFactory {

286

/**

287

* Create a new TransportClientFactory

288

* @param context Transport context for configuration

289

* @param clientBootstraps List of client bootstrap configurations

290

*/

291

public TransportClientFactory(TransportContext context, List<TransportClientBootstrap> clientBootstraps);

292

293

/**

294

* Get all metrics for monitoring connection pool and performance

295

* @return MetricSet containing all factory metrics

296

*/

297

public MetricSet getAllMetrics();

298

299

/**

300

* Create a pooled client connection to the specified remote host and port

301

* @param remoteHost Hostname or IP address to connect to

302

* @param remotePort Port number to connect to

303

* @return TransportClient instance from connection pool

304

* @throws IOException if connection fails

305

* @throws InterruptedException if connection is interrupted

306

*/

307

public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException;

308

309

/**

310

* Create an unmanaged client connection (not pooled)

311

* @param remoteHost Hostname or IP address to connect to

312

* @param remotePort Port number to connect to

313

* @return TransportClient instance not managed by pool

314

* @throws IOException if connection fails

315

* @throws InterruptedException if connection is interrupted

316

*/

317

public TransportClient createUnmanagedClient(String remoteHost, int remotePort) throws IOException, InterruptedException;

318

319

/**

320

* Close the factory and all pooled connections

321

*/

322

public void close();

323

}

324

```

325

326

**Usage Examples:**

327

328

```java

329

// Create factory with auth bootstrap

330

List<TransportClientBootstrap> bootstraps = Arrays.asList(

331

new SaslClientBootstrap(conf, appId, secretKeyHolder)

332

);

333

TransportClientFactory factory = new TransportClientFactory(context, bootstraps);

334

335

// Create pooled client (recommended for most use cases)

336

TransportClient client = factory.createClient("spark-worker-1", 7337);

337

338

// Create unmanaged client (for special cases)

339

TransportClient unmanagedClient = factory.createUnmanagedClient("spark-worker-2", 7337);

340

341

// Monitor connection pool

342

MetricSet metrics = factory.getAllMetrics();

343

344

// Cleanup

345

client.close();

346

unmanagedClient.close();

347

factory.close();

348

```

349

350

### Transport Server

351

352

Server for handling incoming connections and requests with pluggable RPC handlers and bootstrap configurations.

353

354

```java { .api }

355

/**

356

* Server for handling incoming connections and requests

357

*/

358

public class TransportServer {

359

/**

360

* Create a new TransportServer

361

* @param context Transport context with configuration and RPC handler

362

* @param hostToBind Host address to bind server to (null for all interfaces)

363

* @param portToBind Port to bind server to (0 for any available port)

364

* @param appRpcHandler RPC handler for processing application messages

365

* @param bootstraps List of server bootstrap configurations

366

*/

367

public TransportServer(TransportContext context, String hostToBind, int portToBind,

368

RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps);

369

370

/**

371

* Get the port the server is listening on

372

* @return Port number

373

*/

374

public int getPort();

375

376

/**

377

* Shut down the server and clean up resources

378

*/

379

public void close();

380

}

381

```

382

383

**Usage Examples:**

384

385

```java

386

// Basic server

387

TransportServer server = new TransportServer(context, null, 0, rpcHandler, Collections.emptyList());

388

int port = server.getPort();

389

System.out.println("Server listening on port: " + port);

390

391

// Server with authentication

392

List<TransportServerBootstrap> bootstraps = Arrays.asList(

393

new SaslServerBootstrap(conf, secretKeyHolder)

394

);

395

TransportServer authServer = new TransportServer(context, "localhost", 8080, rpcHandler, bootstraps);

396

397

// Cleanup

398

server.close();

399

authServer.close();

400

```

401

402

### Bootstrap Interfaces

403

404

Interfaces for customizing client and server channel initialization, supporting authentication, encryption, and other channel setup requirements.

405

406

```java { .api }

407

/**

408

* Bootstrap interface for client-side channel initialization

409

*/

410

public interface TransportClientBootstrap {

411

/**

412

* Initialize a client channel with custom configuration

413

* @param client The transport client instance

414

* @param channel The Netty channel to bootstrap

415

* @throws RuntimeException if bootstrap fails

416

*/

417

void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

418

}

419

420

/**

421

* Bootstrap interface for server-side channel initialization

422

*/

423

public interface TransportServerBootstrap {

424

/**

425

* Initialize a server channel and return the RPC handler to use

426

* @param channel The Netty channel to bootstrap

427

* @param rpcHandler The default RPC handler

428

* @return RPC handler to use for this channel (may be wrapped or replaced)

429

*/

430

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

431

}

432

```

433

434

## Channel Handler

435

436

Low-level Netty channel handler for the transport protocol, typically used internally by the transport layer.

437

438

```java { .api }

439

/**

440

* Netty channel handler for transport protocol

441

*/

442

public class TransportChannelHandler extends ChannelInboundHandlerAdapter {

443

/**

444

* Create a new TransportChannelHandler

445

* @param client Transport client for this channel

446

* @param requestHandler Handler for processing requests

447

* @param closeIdleConnections Whether to close idle connections

448

* @param streamInterceptor Optional interceptor for stream frames

449

*/

450

public TransportChannelHandler(TransportClient client, TransportRequestHandler requestHandler,

451

boolean closeIdleConnections, TransportFrameDecoder.Interceptor streamInterceptor);

452

453

/**

454

* Get the transport client for this channel

455

* @return TransportClient instance

456

*/

457

public TransportClient getClient();

458

}

459

```