or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md

server-operations.mddocs/

0

# Server Operations

1

2

The server operations API provides the foundation for handling client connections, processing RPC requests, and managing data streams in Apache Spark's networking layer. The `TransportServer` class serves as the main server component, while `RpcHandler` defines the interface for custom request processing logic.

3

4

## Capabilities

5

6

### TransportServer

7

8

Main server class for handling client connections and network communication.

9

10

```java { .api }

11

/**

12

* Create a transport server with specified configuration

13

* @param context - TransportContext for server configuration

14

* @param hostToBind - Host address to bind the server to

15

* @param portToBind - Port number to bind the server to (0 for system-assigned)

16

* @param appRpcHandler - RPC handler for processing application messages

17

* @param bootstraps - List of server bootstrap configurations

18

*/

19

public TransportServer(TransportContext context, String hostToBind, int portToBind, RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps);

20

```

21

22

### Server Information

23

24

Methods for retrieving server status and configuration information.

25

26

```java { .api }

27

/**

28

* Get the port number the server is bound to

29

* @return int port number (useful when server was created with port 0)

30

*/

31

public int getPort();

32

33

/**

34

* Get comprehensive metrics for the server

35

* @return MetricSet containing all server metrics including connection counts and performance data

36

*/

37

public MetricSet getAllMetrics();

38

39

/**

40

* Get counter for registered connections

41

* @return Counter tracking the number of active registered connections

42

*/

43

public Counter getRegisteredConnections();

44

```

45

46

### Resource Management

47

48

Proper server shutdown and resource cleanup.

49

50

```java { .api }

51

/**

52

* Close the server and release all associated resources

53

* This includes closing all client connections and shutting down the server socket

54

*/

55

public void close();

56

```

57

58

## RPC Handler

59

60

### RpcHandler (Abstract Class)

61

62

Abstract base class for handling RPC messages and defining server behavior.

63

64

```java { .api }

65

/**

66

* Process an RPC message from a client with callback response

67

* @param client - TransportClient representing the sender

68

* @param message - ByteBuffer containing the RPC message data

69

* @param callback - RpcResponseCallback for sending the response

70

*/

71

public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

72

73

/**

74

* Get the stream manager for handling streaming operations

75

* @return StreamManager instance for managing data streams

76

*/

77

public abstract StreamManager getStreamManager();

78

```

79

80

### RPC Handler Lifecycle Methods

81

82

Methods for handling client connection lifecycle events.

83

84

```java { .api }

85

/**

86

* Called when a client connection becomes active

87

* @param client - TransportClient that became active

88

*/

89

public void channelActive(TransportClient client);

90

91

/**

92

* Called when a client connection becomes inactive

93

* @param client - TransportClient that became inactive

94

*/

95

public void channelInactive(TransportClient client);

96

97

/**

98

* Called when an exception occurs on a client connection

99

* @param cause - Throwable representing the exception

100

* @param client - TransportClient where the exception occurred

101

*/

102

public void exceptionCaught(Throwable cause, TransportClient client);

103

```

104

105

### Stream Processing

106

107

Methods for handling streaming operations and upload streams.

108

109

```java { .api }

110

/**

111

* Handle incoming stream data with header and callback

112

* @param client - TransportClient sending the stream

113

* @param messageHeader - ByteBuffer containing stream metadata

114

* @param callback - RpcResponseCallback for stream response

115

* @return StreamCallbackWithID for handling the streaming data, or null if not supported

116

*/

117

public StreamCallbackWithID receiveStream(TransportClient client, ByteBuffer messageHeader, RpcResponseCallback callback);

118

119

/**

120

* Handle one-way messages (no response expected)

121

* @param client - TransportClient sending the message

122

* @param message - ByteBuffer containing the message data

123

*/

124

public void receive(TransportClient client, ByteBuffer message);

125

```

126

127

### Merged Block Support

128

129

Support for Spark's merged block functionality used in shuffle optimization.

130

131

```java { .api }

132

/**

133

* Get the handler for merged block metadata requests

134

* @return MergedBlockMetaReqHandler for processing merged block requests, or null if not supported

135

*/

136

public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler();

137

```

138

139

## Stream Management

140

141

### StreamManager (Abstract Class)

142

143

Abstract class for managing streams that can be read by TransportClients.

144

145

```java { .api }

146

/**

147

* Get a specific chunk from a stream

148

* @param streamId - Identifier of the stream

149

* @param chunkIndex - Index of the chunk within the stream

150

* @return ManagedBuffer containing the chunk data

151

*/

152

public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

153

154

/**

155

* Open a named stream for reading

156

* @param streamId - Identifier of the stream to open

157

* @return ManagedBuffer containing the stream data

158

*/

159

public abstract ManagedBuffer openStream(String streamId);

160

```

161

162

## Built-in Implementations

163

164

### NoOpRpcHandler

165

166

No-operation RPC handler for testing and basic server setups.

167

168

```java { .api }

169

public class NoOpRpcHandler extends RpcHandler {

170

public NoOpRpcHandler();

171

172

@Override

173

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

174

175

@Override

176

public StreamManager getStreamManager();

177

}

178

```

179

180

### OneForOneStreamManager

181

182

Simple stream manager implementation that provides one-to-one mapping between stream IDs and buffers.

183

184

```java { .api }

185

public class OneForOneStreamManager extends StreamManager {

186

/**

187

* Create a stream manager with a list of managed buffers

188

* @param buffers - List of ManagedBuffer instances to serve as streams

189

*/

190

public OneForOneStreamManager(List<ManagedBuffer> buffers);

191

192

@Override

193

public ManagedBuffer getChunk(long streamId, int chunkIndex);

194

195

@Override

196

public ManagedBuffer openStream(String streamId);

197

198

/**

199

* Register a new stream and return its ID

200

* @param buffer - ManagedBuffer to register as a stream

201

* @return long stream ID for the registered buffer

202

*/

203

public long registerStream(ManagedBuffer buffer);

204

}

205

```

206

207

## Server Bootstrap

208

209

### TransportServerBootstrap

210

211

Interface for customizing server initialization, commonly used for authentication and encryption setup.

212

213

```java { .api }

214

public interface TransportServerBootstrap {

215

/**

216

* Perform bootstrap operations on a new server channel

217

* @param channel - Netty Channel for the server connection

218

* @param rpcHandler - RpcHandler that will process requests on this channel

219

* @return RpcHandler (possibly wrapped or modified) to use for this channel

220

*/

221

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

222

}

223

```

224

225

## Exception Classes

226

227

### BlockPushNonFatalFailure

228

229

Exception for non-fatal failures in block push operations.

230

231

```java { .api }

232

public class BlockPushNonFatalFailure extends RuntimeException {

233

/**

234

* Create exception with error message

235

* @param message - Description of the non-fatal failure

236

*/

237

public BlockPushNonFatalFailure(String message);

238

239

/**

240

* Create exception with error message and cause

241

* @param message - Description of the non-fatal failure

242

* @param cause - Underlying cause of the failure

243

*/

244

public BlockPushNonFatalFailure(String message, Throwable cause);

245

}

246

```

247

248

## Usage Examples

249

250

### Basic Server Setup

251

252

```java

253

import org.apache.spark.network.TransportContext;

254

import org.apache.spark.network.server.TransportServer;

255

import org.apache.spark.network.server.RpcHandler;

256

import org.apache.spark.network.server.StreamManager;

257

import org.apache.spark.network.client.TransportClient;

258

import org.apache.spark.network.client.RpcResponseCallback;

259

260

// Create custom RPC handler

261

RpcHandler customHandler = new RpcHandler() {

262

@Override

263

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

264

// Process the RPC message

265

String request = new String(message.array());

266

System.out.println("Received RPC: " + request);

267

268

// Send response

269

String response = "Processed: " + request;

270

ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());

271

callback.onSuccess(responseBuffer);

272

}

273

274

@Override

275

public StreamManager getStreamManager() {

276

// Return stream manager for handling streaming operations

277

return new OneForOneStreamManager(Arrays.asList());

278

}

279

};

280

281

// Create and start server

282

TransportContext context = new TransportContext(conf, customHandler);

283

TransportServer server = context.createServer(8080, Collections.emptyList());

284

285

System.out.println("Server started on port: " + server.getPort());

286

287

// Server automatically handles client connections

288

// Cleanup when done

289

server.close();

290

context.close();

291

```

292

293

### Server with Stream Management

294

295

```java

296

import org.apache.spark.network.server.OneForOneStreamManager;

297

import org.apache.spark.network.buffer.NioManagedBuffer;

298

299

// Create stream manager with some data

300

List<ManagedBuffer> streamBuffers = Arrays.asList(

301

new NioManagedBuffer(ByteBuffer.wrap("Stream data 1".getBytes())),

302

new NioManagedBuffer(ByteBuffer.wrap("Stream data 2".getBytes())),

303

new NioManagedBuffer(ByteBuffer.wrap("Stream data 3".getBytes()))

304

);

305

306

OneForOneStreamManager streamManager = new OneForOneStreamManager(streamBuffers);

307

308

// Custom RPC handler with streaming support

309

RpcHandler streamingHandler = new RpcHandler() {

310

@Override

311

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

312

// Handle RPC requests

313

String request = new String(message.array());

314

if (request.startsWith("GET_STREAM:")) {

315

// Register a new stream and return the stream ID

316

String data = request.substring(11);

317

ManagedBuffer buffer = new NioManagedBuffer(ByteBuffer.wrap(data.getBytes()));

318

long streamId = streamManager.registerStream(buffer);

319

320

String response = "STREAM_ID:" + streamId;

321

callback.onSuccess(ByteBuffer.wrap(response.getBytes()));

322

} else {

323

callback.onSuccess(ByteBuffer.wrap("OK".getBytes()));

324

}

325

}

326

327

@Override

328

public StreamManager getStreamManager() {

329

return streamManager;

330

}

331

332

@Override

333

public void channelActive(TransportClient client) {

334

System.out.println("Client connected: " + client.getSocketAddress());

335

}

336

337

@Override

338

public void channelInactive(TransportClient client) {

339

System.out.println("Client disconnected: " + client.getSocketAddress());

340

}

341

};

342

343

// Create server with streaming support

344

TransportServer server = context.createServer(9999, Collections.emptyList());

345

```

346

347

### Server with Authentication Bootstrap

348

349

```java

350

import org.apache.spark.network.sasl.SaslServerBootstrap;

351

import org.apache.spark.network.sasl.SecretKeyHolder;

352

353

// Create secret key holder for authentication

354

SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {

355

@Override

356

public String getSaslUser(String appId) {

357

return "spark-user";

358

}

359

360

@Override

361

public String getSecretKey(String appId) {

362

return "my-secret-key";

363

}

364

};

365

366

// Create server with SASL authentication

367

List<TransportServerBootstrap> bootstraps = Arrays.asList(

368

new SaslServerBootstrap(conf, secretKeyHolder)

369

);

370

371

TransportServer authenticatedServer = context.createServer(8443, bootstraps);

372

System.out.println("Authenticated server started on port: " + authenticatedServer.getPort());

373

```

374

375

### Advanced RPC Handler with Error Handling

376

377

```java

378

RpcHandler robustHandler = new RpcHandler() {

379

@Override

380

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

381

try {

382

// Process message

383

String request = new String(message.array());

384

System.out.println("Processing request from " + client.getSocketAddress() + ": " + request);

385

386

// Simulate processing

387

if (request.equals("ERROR")) {

388

throw new RuntimeException("Simulated processing error");

389

}

390

391

// Send successful response

392

String response = "Processed at " + System.currentTimeMillis();

393

callback.onSuccess(ByteBuffer.wrap(response.getBytes()));

394

395

} catch (Exception e) {

396

System.err.println("Error processing RPC: " + e.getMessage());

397

callback.onFailure(e);

398

}

399

}

400

401

@Override

402

public StreamManager getStreamManager() {

403

return new OneForOneStreamManager(Collections.emptyList());

404

}

405

406

@Override

407

public void exceptionCaught(Throwable cause, TransportClient client) {

408

System.err.println("Exception on client " + client.getSocketAddress() + ": " + cause.getMessage());

409

// Could implement custom error handling logic here

410

}

411

};

412

```

413

414

### Server Metrics Monitoring

415

416

```java

417

import com.codahale.metrics.MetricSet;

418

import com.codahale.metrics.Counter;

419

420

// Monitor server metrics

421

TransportServer server = context.createServer();

422

423

// Get metrics

424

MetricSet metrics = server.getAllMetrics();

425

Counter connections = server.getRegisteredConnections();

426

427

// Print metrics periodically

428

Timer timer = new Timer(true);

429

timer.scheduleAtFixedRate(new TimerTask() {

430

@Override

431

public void run() {

432

System.out.println("Active connections: " + connections.getCount());

433

// Access other metrics from MetricSet as needed

434

}

435

}, 0, 5000); // Every 5 seconds

436

437

// Stop monitoring when server shuts down

438

server.close();

439

timer.cancel();

440

```

441

442

## Types

443

444

### Related Interfaces and Classes

445

446

```java { .api }

447

public interface MergedBlockMetaReqHandler {

448

void receiveMergedBlockMetaReq(TransportClient client, MergedBlockMetaRequest request, RpcResponseCallback callback);

449

}

450

451

public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {

452

public TransportClient getClient();

453

}

454

455

public abstract class AbstractAuthRpcHandler extends RpcHandler {

456

// Base class for authentication-aware RPC handlers

457

}

458

```