0
# Server Operations
1
2
The server operations API provides the foundation for handling client connections, processing RPC requests, and managing data streams in Apache Spark's networking layer. The `TransportServer` class serves as the main server component, while `RpcHandler` defines the interface for custom request processing logic.
3
4
## Capabilities
5
6
### TransportServer
7
8
Main server class for handling client connections and network communication.
9
10
```java { .api }
11
/**
12
* Create a transport server with specified configuration
13
* @param context - TransportContext for server configuration
14
* @param hostToBind - Host address to bind the server to
15
* @param portToBind - Port number to bind the server to (0 for system-assigned)
16
* @param appRpcHandler - RPC handler for processing application messages
17
* @param bootstraps - List of server bootstrap configurations
18
*/
19
public TransportServer(TransportContext context, String hostToBind, int portToBind, RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps);
20
```
21
22
### Server Information
23
24
Methods for retrieving server status and configuration information.
25
26
```java { .api }
27
/**
28
* Get the port number the server is bound to
29
* @return int port number (useful when server was created with port 0)
30
*/
31
public int getPort();
32
33
/**
34
* Get comprehensive metrics for the server
35
* @return MetricSet containing all server metrics including connection counts and performance data
36
*/
37
public MetricSet getAllMetrics();
38
39
/**
40
* Get counter for registered connections
41
* @return Counter tracking the number of active registered connections
42
*/
43
public Counter getRegisteredConnections();
44
```
45
46
### Resource Management
47
48
Proper server shutdown and resource cleanup.
49
50
```java { .api }
51
/**
52
* Close the server and release all associated resources
53
* This includes closing all client connections and shutting down the server socket
54
*/
55
public void close();
56
```
57
58
## RPC Handler
59
60
### RpcHandler (Abstract Class)
61
62
Abstract base class for handling RPC messages and defining server behavior.
63
64
```java { .api }
65
/**
66
* Process an RPC message from a client with callback response
67
* @param client - TransportClient representing the sender
68
* @param message - ByteBuffer containing the RPC message data
69
* @param callback - RpcResponseCallback for sending the response
70
*/
71
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
72
73
/**
74
* Get the stream manager for handling streaming operations
75
* @return StreamManager instance for managing data streams
76
*/
77
public abstract StreamManager getStreamManager();
78
```
79
80
### RPC Handler Lifecycle Methods
81
82
Methods for handling client connection lifecycle events.
83
84
```java { .api }
85
/**
86
* Called when a client connection becomes active
87
* @param client - TransportClient that became active
88
*/
89
public void channelActive(TransportClient client);
90
91
/**
92
* Called when a client connection becomes inactive
93
* @param client - TransportClient that became inactive
94
*/
95
public void channelInactive(TransportClient client);
96
97
/**
98
* Called when an exception occurs on a client connection
99
* @param cause - Throwable representing the exception
100
* @param client - TransportClient where the exception occurred
101
*/
102
public void exceptionCaught(Throwable cause, TransportClient client);
103
```
104
105
### Stream Processing
106
107
Methods for handling streaming operations and upload streams.
108
109
```java { .api }
110
/**
111
* Handle incoming stream data with header and callback
112
* @param client - TransportClient sending the stream
113
* @param messageHeader - ByteBuffer containing stream metadata
114
* @param callback - RpcResponseCallback for stream response
115
* @return StreamCallbackWithID for handling the streaming data, or null if not supported
116
*/
117
public StreamCallbackWithID receiveStream(TransportClient client, ByteBuffer messageHeader, RpcResponseCallback callback);
118
119
/**
120
* Handle one-way messages (no response expected)
121
* @param client - TransportClient sending the message
122
* @param message - ByteBuffer containing the message data
123
*/
124
public void receive(TransportClient client, ByteBuffer message);
125
```
126
127
### Merged Block Support
128
129
Support for Spark's merged block functionality used in shuffle optimization.
130
131
```java { .api }
132
/**
133
* Get the handler for merged block metadata requests
134
* @return MergedBlockMetaReqHandler for processing merged block requests, or null if not supported
135
*/
136
public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler();
137
```
138
139
## Stream Management
140
141
### StreamManager (Abstract Class)
142
143
Abstract class for managing streams that can be read by TransportClients.
144
145
```java { .api }
146
/**
147
* Get a specific chunk from a stream
148
* @param streamId - Identifier of the stream
149
* @param chunkIndex - Index of the chunk within the stream
150
* @return ManagedBuffer containing the chunk data
151
*/
152
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
153
154
/**
155
* Open a named stream for reading
156
* @param streamId - Identifier of the stream to open
157
* @return ManagedBuffer containing the stream data
158
*/
159
public abstract ManagedBuffer openStream(String streamId);
160
```
161
162
## Built-in Implementations
163
164
### NoOpRpcHandler
165
166
No-operation RPC handler for testing and basic server setups.
167
168
```java { .api }
169
public class NoOpRpcHandler extends RpcHandler {
170
public NoOpRpcHandler();
171
172
@Override
173
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
174
175
@Override
176
public StreamManager getStreamManager();
177
}
178
```
179
180
### OneForOneStreamManager
181
182
Simple stream manager implementation that provides one-to-one mapping between stream IDs and buffers.
183
184
```java { .api }
185
public class OneForOneStreamManager extends StreamManager {
186
/**
187
* Create a stream manager with a list of managed buffers
188
* @param buffers - List of ManagedBuffer instances to serve as streams
189
*/
190
public OneForOneStreamManager(List<ManagedBuffer> buffers);
191
192
@Override
193
public ManagedBuffer getChunk(long streamId, int chunkIndex);
194
195
@Override
196
public ManagedBuffer openStream(String streamId);
197
198
/**
199
* Register a new stream and return its ID
200
* @param buffer - ManagedBuffer to register as a stream
201
* @return long stream ID for the registered buffer
202
*/
203
public long registerStream(ManagedBuffer buffer);
204
}
205
```
206
207
## Server Bootstrap
208
209
### TransportServerBootstrap
210
211
Interface for customizing server initialization, commonly used for authentication and encryption setup.
212
213
```java { .api }
214
public interface TransportServerBootstrap {
215
/**
216
* Perform bootstrap operations on a new server channel
217
* @param channel - Netty Channel for the server connection
218
* @param rpcHandler - RpcHandler that will process requests on this channel
219
* @return RpcHandler (possibly wrapped or modified) to use for this channel
220
*/
221
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
222
}
223
```
224
225
## Exception Classes
226
227
### BlockPushNonFatalFailure
228
229
Exception for non-fatal failures in block push operations.
230
231
```java { .api }
232
public class BlockPushNonFatalFailure extends RuntimeException {
233
/**
234
* Create exception with error message
235
* @param message - Description of the non-fatal failure
236
*/
237
public BlockPushNonFatalFailure(String message);
238
239
/**
240
* Create exception with error message and cause
241
* @param message - Description of the non-fatal failure
242
* @param cause - Underlying cause of the failure
243
*/
244
public BlockPushNonFatalFailure(String message, Throwable cause);
245
}
246
```
247
248
## Usage Examples
249
250
### Basic Server Setup
251
252
```java
253
import org.apache.spark.network.TransportContext;
254
import org.apache.spark.network.server.TransportServer;
255
import org.apache.spark.network.server.RpcHandler;
256
import org.apache.spark.network.server.StreamManager;
257
import org.apache.spark.network.client.TransportClient;
258
import org.apache.spark.network.client.RpcResponseCallback;
259
260
// Create custom RPC handler
261
RpcHandler customHandler = new RpcHandler() {
262
@Override
263
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
264
// Process the RPC message
265
String request = new String(message.array());
266
System.out.println("Received RPC: " + request);
267
268
// Send response
269
String response = "Processed: " + request;
270
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
271
callback.onSuccess(responseBuffer);
272
}
273
274
@Override
275
public StreamManager getStreamManager() {
276
// Return stream manager for handling streaming operations
277
return new OneForOneStreamManager(Arrays.asList());
278
}
279
};
280
281
// Create and start server
282
TransportContext context = new TransportContext(conf, customHandler);
283
TransportServer server = context.createServer(8080, Collections.emptyList());
284
285
System.out.println("Server started on port: " + server.getPort());
286
287
// Server automatically handles client connections
288
// Cleanup when done
289
server.close();
290
context.close();
291
```
292
293
### Server with Stream Management
294
295
```java
296
import org.apache.spark.network.server.OneForOneStreamManager;
297
import org.apache.spark.network.buffer.NioManagedBuffer;
298
299
// Create stream manager with some data
300
List<ManagedBuffer> streamBuffers = Arrays.asList(
301
new NioManagedBuffer(ByteBuffer.wrap("Stream data 1".getBytes())),
302
new NioManagedBuffer(ByteBuffer.wrap("Stream data 2".getBytes())),
303
new NioManagedBuffer(ByteBuffer.wrap("Stream data 3".getBytes()))
304
);
305
306
OneForOneStreamManager streamManager = new OneForOneStreamManager(streamBuffers);
307
308
// Custom RPC handler with streaming support
309
RpcHandler streamingHandler = new RpcHandler() {
310
@Override
311
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
312
// Handle RPC requests
313
String request = new String(message.array());
314
if (request.startsWith("GET_STREAM:")) {
315
// Register a new stream and return the stream ID
316
String data = request.substring(11);
317
ManagedBuffer buffer = new NioManagedBuffer(ByteBuffer.wrap(data.getBytes()));
318
long streamId = streamManager.registerStream(buffer);
319
320
String response = "STREAM_ID:" + streamId;
321
callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
322
} else {
323
callback.onSuccess(ByteBuffer.wrap("OK".getBytes()));
324
}
325
}
326
327
@Override
328
public StreamManager getStreamManager() {
329
return streamManager;
330
}
331
332
@Override
333
public void channelActive(TransportClient client) {
334
System.out.println("Client connected: " + client.getSocketAddress());
335
}
336
337
@Override
338
public void channelInactive(TransportClient client) {
339
System.out.println("Client disconnected: " + client.getSocketAddress());
340
}
341
};
342
343
// Create server with streaming support
344
TransportServer server = context.createServer(9999, Collections.emptyList());
345
```
346
347
### Server with Authentication Bootstrap
348
349
```java
350
import org.apache.spark.network.sasl.SaslServerBootstrap;
351
import org.apache.spark.network.sasl.SecretKeyHolder;
352
353
// Create secret key holder for authentication
354
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
355
@Override
356
public String getSaslUser(String appId) {
357
return "spark-user";
358
}
359
360
@Override
361
public String getSecretKey(String appId) {
362
return "my-secret-key";
363
}
364
};
365
366
// Create server with SASL authentication
367
List<TransportServerBootstrap> bootstraps = Arrays.asList(
368
new SaslServerBootstrap(conf, secretKeyHolder)
369
);
370
371
TransportServer authenticatedServer = context.createServer(8443, bootstraps);
372
System.out.println("Authenticated server started on port: " + authenticatedServer.getPort());
373
```
374
375
### Advanced RPC Handler with Error Handling
376
377
```java
378
RpcHandler robustHandler = new RpcHandler() {
379
@Override
380
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
381
try {
382
// Process message
383
String request = new String(message.array());
384
System.out.println("Processing request from " + client.getSocketAddress() + ": " + request);
385
386
// Simulate processing
387
if (request.equals("ERROR")) {
388
throw new RuntimeException("Simulated processing error");
389
}
390
391
// Send successful response
392
String response = "Processed at " + System.currentTimeMillis();
393
callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
394
395
} catch (Exception e) {
396
System.err.println("Error processing RPC: " + e.getMessage());
397
callback.onFailure(e);
398
}
399
}
400
401
@Override
402
public StreamManager getStreamManager() {
403
return new OneForOneStreamManager(Collections.emptyList());
404
}
405
406
@Override
407
public void exceptionCaught(Throwable cause, TransportClient client) {
408
System.err.println("Exception on client " + client.getSocketAddress() + ": " + cause.getMessage());
409
// Could implement custom error handling logic here
410
}
411
};
412
```
413
414
### Server Metrics Monitoring
415
416
```java
417
import com.codahale.metrics.MetricSet;
418
import com.codahale.metrics.Counter;
419
420
// Monitor server metrics
421
TransportServer server = context.createServer();
422
423
// Get metrics
424
MetricSet metrics = server.getAllMetrics();
425
Counter connections = server.getRegisteredConnections();
426
427
// Print metrics periodically
428
Timer timer = new Timer(true);
429
timer.scheduleAtFixedRate(new TimerTask() {
430
@Override
431
public void run() {
432
System.out.println("Active connections: " + connections.getCount());
433
// Access other metrics from MetricSet as needed
434
}
435
}, 0, 5000); // Every 5 seconds
436
437
// Stop monitoring when server shuts down
438
server.close();
439
timer.cancel();
440
```
441
442
## Types
443
444
### Related Interfaces and Classes
445
446
```java { .api }
447
public interface MergedBlockMetaReqHandler {
448
void receiveMergedBlockMetaReq(TransportClient client, MergedBlockMetaRequest request, RpcResponseCallback callback);
449
}
450
451
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
452
public TransportClient getClient();
453
}
454
455
public abstract class AbstractAuthRpcHandler extends RpcHandler {
456
// Base class for authentication-aware RPC handlers
457
}
458
```