0
# Server Operations
1
2
Server-side networking functionality for handling inbound connections, processing RPC requests, and managing data streams. The server API provides extensible handlers for custom application logic.
3
4
## Capabilities
5
6
### TransportServer
7
8
Main server class for accepting and handling inbound network connections.
9
10
```java { .api }
11
/**
12
* TransportServer handles inbound network connections and delegates
13
* request processing to configured RPC handlers and stream managers.
14
*/
15
public class TransportServer implements Closeable {
16
public TransportServer(
17
TransportContext context,
18
String hostToBind,
19
int portToBind,
20
RpcHandler appRpcHandler,
21
List<TransportServerBootstrap> bootstraps
22
);
23
24
/**
25
* Gets the port the server is bound to.
26
* Useful when binding to port 0 to get an available port.
27
*
28
* @return The actual port number the server is listening on
29
*/
30
public int getPort();
31
32
/** Closes the server and releases all resources */
33
public void close();
34
}
35
```
36
37
### RpcHandler
38
39
Abstract base class for handling RPC requests and managing application-specific logic.
40
41
```java { .api }
42
/**
43
* RpcHandler defines the interface for processing RPC requests on the server side.
44
* Applications must implement this class to provide custom request handling logic.
45
*/
46
public abstract class RpcHandler {
47
/**
48
* Processes an RPC request and provides a response via callback.
49
* This is the main method for handling client requests.
50
*
51
* @param client The client that sent the request
52
* @param message The request message as ByteBuffer
53
* @param callback Callback to send the response back to the client
54
*/
55
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
56
57
/**
58
* Gets the stream manager for handling chunk and stream requests.
59
*
60
* @return StreamManager instance for this handler
61
*/
62
public abstract StreamManager getStreamManager();
63
64
/**
65
* Handles one-way messages (no response expected).
66
* Default implementation does nothing.
67
*
68
* @param client The client that sent the message
69
* @param message The one-way message as ByteBuffer
70
*/
71
public void receive(TransportClient client, ByteBuffer message) {
72
// Default: no-op
73
}
74
75
/**
76
* Called when a client connection is terminated.
77
* Applications can override this for cleanup logic.
78
*
79
* @param client The client whose connection was terminated
80
*/
81
public void connectionTerminated(TransportClient client) {
82
// Default: no-op
83
}
84
85
/**
86
* Called when an exception occurs during request processing.
87
* Applications can override this for custom error handling.
88
*
89
* @param cause The exception that occurred
90
* @param client The client associated with the exception
91
*/
92
public void exceptionCaught(Throwable cause, TransportClient client) {
93
// Default: no-op
94
}
95
}
96
```
97
98
### StreamManager
99
100
Abstract base class for managing data streams and chunk access.
101
102
```java { .api }
103
/**
104
* StreamManager handles stream-based data access for chunk fetching and streaming operations.
105
* Applications implement this to provide access to their data.
106
*/
107
public abstract class StreamManager {
108
/**
109
* Gets a specific chunk from a stream.
110
* This is called when clients request chunks via fetchChunk().
111
*
112
* @param streamId The stream identifier
113
* @param chunkIndex The index of the requested chunk
114
* @return ManagedBuffer containing the chunk data
115
* @throws IllegalArgumentException if stream or chunk doesn't exist
116
*/
117
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
118
119
/**
120
* Opens a stream by string identifier.
121
* This is called when clients request streams via stream().
122
* Default implementation throws UnsupportedOperationException.
123
*
124
* @param streamId The string stream identifier
125
* @return ManagedBuffer for the entire stream
126
* @throws UnsupportedOperationException if not implemented
127
*/
128
public ManagedBuffer openStream(String streamId) {
129
throw new UnsupportedOperationException("Stream opening is not supported");
130
}
131
132
/**
133
* Registers a channel for a specific stream.
134
* Called when a stream is opened to track which channels are using it.
135
*
136
* @param channel The Netty channel
137
* @param streamId The stream identifier
138
*/
139
public void registerChannel(Channel channel, long streamId) {
140
// Default: no-op
141
}
142
143
/**
144
* Called when a connection/channel is terminated.
145
* Applications can override this for stream cleanup.
146
*
147
* @param channel The terminated channel
148
*/
149
public void connectionTerminated(Channel channel) {
150
// Default: no-op
151
}
152
153
/**
154
* Checks if a client is authorized to access a stream.
155
* Applications can override this for access control.
156
*
157
* @param client The client requesting access
158
* @param streamId The stream identifier
159
* @throws SecurityException if access is denied
160
*/
161
public void checkAuthorization(TransportClient client, long streamId) {
162
// Default: allow all access
163
}
164
}
165
```
166
167
### Built-in Implementations
168
169
#### NoOpRpcHandler
170
171
```java { .api }
172
/**
173
* No-operation RPC handler that provides basic functionality without custom logic.
174
* Useful for testing or when only stream operations are needed.
175
*/
176
public class NoOpRpcHandler extends RpcHandler {
177
public NoOpRpcHandler();
178
179
@Override
180
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
181
// Responds with empty message
182
}
183
184
@Override
185
public StreamManager getStreamManager() {
186
// Returns a basic OneForOneStreamManager
187
}
188
}
189
```
190
191
#### OneForOneStreamManager
192
193
```java { .api }
194
/**
195
* Stream manager that maintains a one-to-one mapping between streams and buffers.
196
* Useful for simple use cases where each stream corresponds to a single data source.
197
*/
198
public class OneForOneStreamManager extends StreamManager {
199
public OneForOneStreamManager();
200
201
/**
202
* Registers a single buffer as a stream.
203
*
204
* @param appId Application identifier
205
* @param buffer The buffer to serve as stream data
206
* @return The assigned stream ID
207
*/
208
public long registerStream(String appId, ManagedBuffer buffer);
209
210
/**
211
* Registers multiple buffers as a chunked stream.
212
*
213
* @param appId Application identifier
214
* @param buffers Iterator of buffers to serve as stream chunks
215
* @return The assigned stream ID
216
*/
217
public long registerStream(String appId, Iterator<ManagedBuffer> buffers);
218
219
@Override
220
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
221
// Returns the appropriate chunk for the stream
222
}
223
}
224
```
225
226
## Usage Examples
227
228
### Basic Server Setup
229
230
```java
231
import org.apache.spark.network.TransportContext;
232
import org.apache.spark.network.server.TransportServer;
233
import org.apache.spark.network.server.NoOpRpcHandler;
234
235
// Create a basic server with no-op handler
236
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
237
238
// Create server on specific port
239
TransportServer server = context.createServer("localhost", 8080, new ArrayList<>());
240
241
System.out.println("Server started on port: " + server.getPort());
242
243
// Server is now accepting connections
244
// Don't forget to close when done
245
Runtime.getRuntime().addShutdownHook(new Thread(server::close));
246
```
247
248
### Custom RPC Handler
249
250
```java
251
import org.apache.spark.network.server.RpcHandler;
252
import org.apache.spark.network.server.OneForOneStreamManager;
253
254
public class CustomRpcHandler extends RpcHandler {
255
private final StreamManager streamManager = new OneForOneStreamManager();
256
257
@Override
258
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
259
try {
260
// Parse the request
261
String request = new String(message.array());
262
System.out.println("Received RPC: " + request);
263
264
// Process the request
265
String response = processRequest(request);
266
267
// Send response back
268
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
269
callback.onSuccess(responseBuffer);
270
271
} catch (Exception e) {
272
System.err.println("Error processing RPC: " + e.getMessage());
273
callback.onFailure(e);
274
}
275
}
276
277
@Override
278
public void receive(TransportClient client, ByteBuffer message) {
279
// Handle one-way messages
280
String notification = new String(message.array());
281
System.out.println("Received notification: " + notification);
282
}
283
284
@Override
285
public StreamManager getStreamManager() {
286
return streamManager;
287
}
288
289
@Override
290
public void connectionTerminated(TransportClient client) {
291
System.out.println("Client disconnected: " + client.getSocketAddress());
292
}
293
294
@Override
295
public void exceptionCaught(Throwable cause, TransportClient client) {
296
System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());
297
}
298
299
private String processRequest(String request) {
300
// Custom request processing logic
301
if (request.startsWith("PING")) {
302
return "PONG";
303
} else if (request.startsWith("GET_TIME")) {
304
return String.valueOf(System.currentTimeMillis());
305
} else {
306
return "UNKNOWN_COMMAND";
307
}
308
}
309
}
310
```
311
312
### Custom Stream Manager
313
314
```java
315
import org.apache.spark.network.server.StreamManager;
316
import org.apache.spark.network.buffer.ManagedBuffer;
317
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
318
import java.io.File;
319
import java.util.concurrent.ConcurrentHashMap;
320
321
public class FileStreamManager extends StreamManager {
322
private final ConcurrentHashMap<Long, StreamInfo> streams = new ConcurrentHashMap<>();
323
private final String dataDirectory;
324
325
public FileStreamManager(String dataDirectory) {
326
this.dataDirectory = dataDirectory;
327
}
328
329
@Override
330
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
331
StreamInfo streamInfo = streams.get(streamId);
332
if (streamInfo == null) {
333
throw new IllegalArgumentException("Unknown stream: " + streamId);
334
}
335
336
if (chunkIndex >= streamInfo.getChunkCount()) {
337
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
338
}
339
340
// Calculate chunk offset and size
341
long chunkSize = streamInfo.getChunkSize();
342
long offset = chunkIndex * chunkSize;
343
long remainingSize = streamInfo.getTotalSize() - offset;
344
long actualChunkSize = Math.min(chunkSize, remainingSize);
345
346
File file = new File(dataDirectory, streamInfo.getFileName());
347
return new FileSegmentManagedBuffer(transportConf, file, offset, actualChunkSize);
348
}
349
350
@Override
351
public ManagedBuffer openStream(String streamId) {
352
// Open entire file as stream
353
File file = new File(dataDirectory, streamId);
354
if (!file.exists()) {
355
throw new IllegalArgumentException("File not found: " + streamId);
356
}
357
358
return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());
359
}
360
361
@Override
362
public void checkAuthorization(TransportClient client, long streamId) {
363
StreamInfo streamInfo = streams.get(streamId);
364
if (streamInfo == null) {
365
throw new SecurityException("Stream not found: " + streamId);
366
}
367
368
// Custom authorization logic
369
if (!isAuthorized(client, streamInfo)) {
370
throw new SecurityException("Access denied to stream: " + streamId);
371
}
372
}
373
374
public long registerFileStream(String fileName, long chunkSize) {
375
File file = new File(dataDirectory, fileName);
376
if (!file.exists()) {
377
throw new IllegalArgumentException("File not found: " + fileName);
378
}
379
380
long streamId = generateStreamId();
381
StreamInfo streamInfo = new StreamInfo(fileName, file.length(), chunkSize);
382
streams.put(streamId, streamInfo);
383
384
return streamId;
385
}
386
387
private boolean isAuthorized(TransportClient client, StreamInfo streamInfo) {
388
// Implement custom authorization logic
389
return true; // Allow all for this example
390
}
391
392
private long generateStreamId() {
393
return System.currentTimeMillis() + (long)(Math.random() * 1000);
394
}
395
396
private static class StreamInfo {
397
private final String fileName;
398
private final long totalSize;
399
private final long chunkSize;
400
401
public StreamInfo(String fileName, long totalSize, long chunkSize) {
402
this.fileName = fileName;
403
this.totalSize = totalSize;
404
this.chunkSize = chunkSize;
405
}
406
407
public String getFileName() { return fileName; }
408
public long getTotalSize() { return totalSize; }
409
public long getChunkSize() { return chunkSize; }
410
public int getChunkCount() { return (int) Math.ceil((double) totalSize / chunkSize); }
411
}
412
}
413
```
414
415
### Server with Custom Stream Manager
416
417
```java
418
public class FileServerExample {
419
public static void main(String[] args) throws Exception {
420
// Create custom handlers
421
FileStreamManager streamManager = new FileStreamManager("/data/files");
422
423
RpcHandler rpcHandler = new RpcHandler() {
424
@Override
425
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
426
try {
427
String request = new String(message.array());
428
429
if (request.startsWith("REGISTER_FILE:")) {
430
String fileName = request.substring("REGISTER_FILE:".length());
431
long streamId = streamManager.registerFileStream(fileName, 64 * 1024); // 64KB chunks
432
433
String response = "STREAM_ID:" + streamId;
434
callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
435
} else {
436
callback.onFailure(new IllegalArgumentException("Unknown command: " + request));
437
}
438
} catch (Exception e) {
439
callback.onFailure(e);
440
}
441
}
442
443
@Override
444
public StreamManager getStreamManager() {
445
return streamManager;
446
}
447
};
448
449
// Create and start server
450
TransportContext context = new TransportContext(conf, rpcHandler);
451
TransportServer server = context.createServer(8080, new ArrayList<>());
452
453
System.out.println("File server started on port: " + server.getPort());
454
455
// Keep server running
456
Thread.currentThread().join();
457
}
458
}
459
```
460
461
### Error Handling in Handlers
462
463
```java
464
@Override
465
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
466
try {
467
// Process request
468
String request = new String(message.array());
469
470
// Validate request
471
if (request.length() > MAX_REQUEST_SIZE) {
472
callback.onFailure(new IllegalArgumentException("Request too large"));
473
return;
474
}
475
476
// Process and respond
477
String response = processRequest(request);
478
callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
479
480
} catch (IllegalArgumentException e) {
481
// Client error - return error response
482
callback.onFailure(e);
483
} catch (Exception e) {
484
// Server error - log and return generic error
485
System.err.println("Internal server error: " + e.getMessage());
486
callback.onFailure(new RuntimeException("Internal server error"));
487
}
488
}
489
490
@Override
491
public void exceptionCaught(Throwable cause, TransportClient client) {
492
System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());
493
494
// Log for debugging
495
if (cause instanceof IOException) {
496
System.out.println("Network issue with client, connection may be lost");
497
} else {
498
System.err.println("Unexpected exception type: " + cause.getClass().getSimpleName());
499
}
500
}
501
```