or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-utilities.mdindex.mdmessage-protocol.mdsasl-authentication.mdserver-operations.mdtransport-setup.md

server-operations.mddocs/

0

# Server Operations

1

2

Server-side networking functionality for handling inbound connections, processing RPC requests, and managing data streams. The server API provides extensible handlers for custom application logic.

3

4

## Capabilities

5

6

### TransportServer

7

8

Main server class for accepting and handling inbound network connections.

9

10

```java { .api }

11

/**

12

* TransportServer handles inbound network connections and delegates

13

* request processing to configured RPC handlers and stream managers.

14

*/

15

public class TransportServer implements Closeable {

16

public TransportServer(

17

TransportContext context,

18

String hostToBind,

19

int portToBind,

20

RpcHandler appRpcHandler,

21

List<TransportServerBootstrap> bootstraps

22

);

23

24

/**

25

* Gets the port the server is bound to.

26

* Useful when binding to port 0 to get an available port.

27

*

28

* @return The actual port number the server is listening on

29

*/

30

public int getPort();

31

32

/** Closes the server and releases all resources */

33

public void close();

34

}

35

```

36

37

### RpcHandler

38

39

Abstract base class for handling RPC requests and managing application-specific logic.

40

41

```java { .api }

42

/**

43

* RpcHandler defines the interface for processing RPC requests on the server side.

44

* Applications must implement this class to provide custom request handling logic.

45

*/

46

public abstract class RpcHandler {

47

/**

48

* Processes an RPC request and provides a response via callback.

49

* This is the main method for handling client requests.

50

*

51

* @param client The client that sent the request

52

* @param message The request message as ByteBuffer

53

* @param callback Callback to send the response back to the client

54

*/

55

public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

56

57

/**

58

* Gets the stream manager for handling chunk and stream requests.

59

*

60

* @return StreamManager instance for this handler

61

*/

62

public abstract StreamManager getStreamManager();

63

64

/**

65

* Handles one-way messages (no response expected).

66

* Default implementation does nothing.

67

*

68

* @param client The client that sent the message

69

* @param message The one-way message as ByteBuffer

70

*/

71

public void receive(TransportClient client, ByteBuffer message) {

72

// Default: no-op

73

}

74

75

/**

76

* Called when a client connection is terminated.

77

* Applications can override this for cleanup logic.

78

*

79

* @param client The client whose connection was terminated

80

*/

81

public void connectionTerminated(TransportClient client) {

82

// Default: no-op

83

}

84

85

/**

86

* Called when an exception occurs during request processing.

87

* Applications can override this for custom error handling.

88

*

89

* @param cause The exception that occurred

90

* @param client The client associated with the exception

91

*/

92

public void exceptionCaught(Throwable cause, TransportClient client) {

93

// Default: no-op

94

}

95

}

96

```

97

98

### StreamManager

99

100

Abstract base class for managing data streams and chunk access.

101

102

```java { .api }

103

/**

104

* StreamManager handles stream-based data access for chunk fetching and streaming operations.

105

* Applications implement this to provide access to their data.

106

*/

107

public abstract class StreamManager {

108

/**

109

* Gets a specific chunk from a stream.

110

* This is called when clients request chunks via fetchChunk().

111

*

112

* @param streamId The stream identifier

113

* @param chunkIndex The index of the requested chunk

114

* @return ManagedBuffer containing the chunk data

115

* @throws IllegalArgumentException if stream or chunk doesn't exist

116

*/

117

public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

118

119

/**

120

* Opens a stream by string identifier.

121

* This is called when clients request streams via stream().

122

* Default implementation throws UnsupportedOperationException.

123

*

124

* @param streamId The string stream identifier

125

* @return ManagedBuffer for the entire stream

126

* @throws UnsupportedOperationException if not implemented

127

*/

128

public ManagedBuffer openStream(String streamId) {

129

throw new UnsupportedOperationException("Stream opening is not supported");

130

}

131

132

/**

133

* Registers a channel for a specific stream.

134

* Called when a stream is opened to track which channels are using it.

135

*

136

* @param channel The Netty channel

137

* @param streamId The stream identifier

138

*/

139

public void registerChannel(Channel channel, long streamId) {

140

// Default: no-op

141

}

142

143

/**

144

* Called when a connection/channel is terminated.

145

* Applications can override this for stream cleanup.

146

*

147

* @param channel The terminated channel

148

*/

149

public void connectionTerminated(Channel channel) {

150

// Default: no-op

151

}

152

153

/**

154

* Checks if a client is authorized to access a stream.

155

* Applications can override this for access control.

156

*

157

* @param client The client requesting access

158

* @param streamId The stream identifier

159

* @throws SecurityException if access is denied

160

*/

161

public void checkAuthorization(TransportClient client, long streamId) {

162

// Default: allow all access

163

}

164

}

165

```

166

167

### Built-in Implementations

168

169

#### NoOpRpcHandler

170

171

```java { .api }

172

/**

173

* No-operation RPC handler that provides basic functionality without custom logic.

174

* Useful for testing or when only stream operations are needed.

175

*/

176

public class NoOpRpcHandler extends RpcHandler {

177

public NoOpRpcHandler();

178

179

@Override

180

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

181

// Responds with empty message

182

}

183

184

@Override

185

public StreamManager getStreamManager() {

186

// Returns a basic OneForOneStreamManager

187

}

188

}

189

```

190

191

#### OneForOneStreamManager

192

193

```java { .api }

194

/**

195

* Stream manager that maintains a one-to-one mapping between streams and buffers.

196

* Useful for simple use cases where each stream corresponds to a single data source.

197

*/

198

public class OneForOneStreamManager extends StreamManager {

199

public OneForOneStreamManager();

200

201

/**

202

* Registers a single buffer as a stream.

203

*

204

* @param appId Application identifier

205

* @param buffer The buffer to serve as stream data

206

* @return The assigned stream ID

207

*/

208

public long registerStream(String appId, ManagedBuffer buffer);

209

210

/**

211

* Registers multiple buffers as a chunked stream.

212

*

213

* @param appId Application identifier

214

* @param buffers Iterator of buffers to serve as stream chunks

215

* @return The assigned stream ID

216

*/

217

public long registerStream(String appId, Iterator<ManagedBuffer> buffers);

218

219

@Override

220

public ManagedBuffer getChunk(long streamId, int chunkIndex) {

221

// Returns the appropriate chunk for the stream

222

}

223

}

224

```

225

226

## Usage Examples

227

228

### Basic Server Setup

229

230

```java

231

import org.apache.spark.network.TransportContext;

232

import org.apache.spark.network.server.TransportServer;

233

import org.apache.spark.network.server.NoOpRpcHandler;

234

235

// Create a basic server with no-op handler

236

TransportContext context = new TransportContext(conf, new NoOpRpcHandler());

237

238

// Create server on specific port

239

TransportServer server = context.createServer("localhost", 8080, new ArrayList<>());

240

241

System.out.println("Server started on port: " + server.getPort());

242

243

// Server is now accepting connections

244

// Don't forget to close when done

245

Runtime.getRuntime().addShutdownHook(new Thread(server::close));

246

```

247

248

### Custom RPC Handler

249

250

```java

251

import org.apache.spark.network.server.RpcHandler;

252

import org.apache.spark.network.server.OneForOneStreamManager;

253

254

public class CustomRpcHandler extends RpcHandler {

255

private final StreamManager streamManager = new OneForOneStreamManager();

256

257

@Override

258

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

259

try {

260

// Parse the request

261

String request = new String(message.array());

262

System.out.println("Received RPC: " + request);

263

264

// Process the request

265

String response = processRequest(request);

266

267

// Send response back

268

ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());

269

callback.onSuccess(responseBuffer);

270

271

} catch (Exception e) {

272

System.err.println("Error processing RPC: " + e.getMessage());

273

callback.onFailure(e);

274

}

275

}

276

277

@Override

278

public void receive(TransportClient client, ByteBuffer message) {

279

// Handle one-way messages

280

String notification = new String(message.array());

281

System.out.println("Received notification: " + notification);

282

}

283

284

@Override

285

public StreamManager getStreamManager() {

286

return streamManager;

287

}

288

289

@Override

290

public void connectionTerminated(TransportClient client) {

291

System.out.println("Client disconnected: " + client.getSocketAddress());

292

}

293

294

@Override

295

public void exceptionCaught(Throwable cause, TransportClient client) {

296

System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());

297

}

298

299

private String processRequest(String request) {

300

// Custom request processing logic

301

if (request.startsWith("PING")) {

302

return "PONG";

303

} else if (request.startsWith("GET_TIME")) {

304

return String.valueOf(System.currentTimeMillis());

305

} else {

306

return "UNKNOWN_COMMAND";

307

}

308

}

309

}

310

```

311

312

### Custom Stream Manager

313

314

```java

315

import org.apache.spark.network.server.StreamManager;

316

import org.apache.spark.network.buffer.ManagedBuffer;

317

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;

318

import java.io.File;

319

import java.util.concurrent.ConcurrentHashMap;

320

321

public class FileStreamManager extends StreamManager {

322

private final ConcurrentHashMap<Long, StreamInfo> streams = new ConcurrentHashMap<>();

323

private final String dataDirectory;

324

325

public FileStreamManager(String dataDirectory) {

326

this.dataDirectory = dataDirectory;

327

}

328

329

@Override

330

public ManagedBuffer getChunk(long streamId, int chunkIndex) {

331

StreamInfo streamInfo = streams.get(streamId);

332

if (streamInfo == null) {

333

throw new IllegalArgumentException("Unknown stream: " + streamId);

334

}

335

336

if (chunkIndex >= streamInfo.getChunkCount()) {

337

throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);

338

}

339

340

// Calculate chunk offset and size

341

long chunkSize = streamInfo.getChunkSize();

342

long offset = chunkIndex * chunkSize;

343

long remainingSize = streamInfo.getTotalSize() - offset;

344

long actualChunkSize = Math.min(chunkSize, remainingSize);

345

346

File file = new File(dataDirectory, streamInfo.getFileName());

347

return new FileSegmentManagedBuffer(transportConf, file, offset, actualChunkSize);

348

}

349

350

@Override

351

public ManagedBuffer openStream(String streamId) {

352

// Open entire file as stream

353

File file = new File(dataDirectory, streamId);

354

if (!file.exists()) {

355

throw new IllegalArgumentException("File not found: " + streamId);

356

}

357

358

return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());

359

}

360

361

@Override

362

public void checkAuthorization(TransportClient client, long streamId) {

363

StreamInfo streamInfo = streams.get(streamId);

364

if (streamInfo == null) {

365

throw new SecurityException("Stream not found: " + streamId);

366

}

367

368

// Custom authorization logic

369

if (!isAuthorized(client, streamInfo)) {

370

throw new SecurityException("Access denied to stream: " + streamId);

371

}

372

}

373

374

public long registerFileStream(String fileName, long chunkSize) {

375

File file = new File(dataDirectory, fileName);

376

if (!file.exists()) {

377

throw new IllegalArgumentException("File not found: " + fileName);

378

}

379

380

long streamId = generateStreamId();

381

StreamInfo streamInfo = new StreamInfo(fileName, file.length(), chunkSize);

382

streams.put(streamId, streamInfo);

383

384

return streamId;

385

}

386

387

private boolean isAuthorized(TransportClient client, StreamInfo streamInfo) {

388

// Implement custom authorization logic

389

return true; // Allow all for this example

390

}

391

392

private long generateStreamId() {

393

return System.currentTimeMillis() + (long)(Math.random() * 1000);

394

}

395

396

private static class StreamInfo {

397

private final String fileName;

398

private final long totalSize;

399

private final long chunkSize;

400

401

public StreamInfo(String fileName, long totalSize, long chunkSize) {

402

this.fileName = fileName;

403

this.totalSize = totalSize;

404

this.chunkSize = chunkSize;

405

}

406

407

public String getFileName() { return fileName; }

408

public long getTotalSize() { return totalSize; }

409

public long getChunkSize() { return chunkSize; }

410

public int getChunkCount() { return (int) Math.ceil((double) totalSize / chunkSize); }

411

}

412

}

413

```

414

415

### Server with Custom Stream Manager

416

417

```java

418

public class FileServerExample {

419

public static void main(String[] args) throws Exception {

420

// Create custom handlers

421

FileStreamManager streamManager = new FileStreamManager("/data/files");

422

423

RpcHandler rpcHandler = new RpcHandler() {

424

@Override

425

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

426

try {

427

String request = new String(message.array());

428

429

if (request.startsWith("REGISTER_FILE:")) {

430

String fileName = request.substring("REGISTER_FILE:".length());

431

long streamId = streamManager.registerFileStream(fileName, 64 * 1024); // 64KB chunks

432

433

String response = "STREAM_ID:" + streamId;

434

callback.onSuccess(ByteBuffer.wrap(response.getBytes()));

435

} else {

436

callback.onFailure(new IllegalArgumentException("Unknown command: " + request));

437

}

438

} catch (Exception e) {

439

callback.onFailure(e);

440

}

441

}

442

443

@Override

444

public StreamManager getStreamManager() {

445

return streamManager;

446

}

447

};

448

449

// Create and start server

450

TransportContext context = new TransportContext(conf, rpcHandler);

451

TransportServer server = context.createServer(8080, new ArrayList<>());

452

453

System.out.println("File server started on port: " + server.getPort());

454

455

// Keep server running

456

Thread.currentThread().join();

457

}

458

}

459

```

460

461

### Error Handling in Handlers

462

463

```java

464

@Override

465

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

466

try {

467

// Process request

468

String request = new String(message.array());

469

470

// Validate request

471

if (request.length() > MAX_REQUEST_SIZE) {

472

callback.onFailure(new IllegalArgumentException("Request too large"));

473

return;

474

}

475

476

// Process and respond

477

String response = processRequest(request);

478

callback.onSuccess(ByteBuffer.wrap(response.getBytes()));

479

480

} catch (IllegalArgumentException e) {

481

// Client error - return error response

482

callback.onFailure(e);

483

} catch (Exception e) {

484

// Server error - log and return generic error

485

System.err.println("Internal server error: " + e.getMessage());

486

callback.onFailure(new RuntimeException("Internal server error"));

487

}

488

}

489

490

@Override

491

public void exceptionCaught(Throwable cause, TransportClient client) {

492

System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());

493

494

// Log for debugging

495

if (cause instanceof IOException) {

496

System.out.println("Network issue with client, connection may be lost");

497

} else {

498

System.err.println("Unexpected exception type: " + cause.getClass().getSimpleName());

499

}

500

}

501

```