0
# Client Operations
1
2
The client operations API provides high-performance networking capabilities for Spark's distributed communication needs. The `TransportClient` class is the main interface for client-side operations, offering thread-safe methods for chunk fetching, RPC communication, and streaming data transfer.
3
4
## Capabilities
5
6
### TransportClient
7
8
Main client class for network operations, providing thread-safe access to chunk fetching, RPC calls, and streaming functionality.
9
10
```java { .api }
11
/**
12
* Create a transport client for network communication
13
* @param channel - Netty channel for network communication
14
* @param handler - Response handler for managing responses and callbacks
15
*/
16
public TransportClient(Channel channel, TransportResponseHandler handler);
17
```
18
19
### Connection Management
20
21
Methods for managing client connections and retrieving connection information.
22
23
```java { .api }
24
/**
25
* Get the underlying Netty channel
26
* @return Channel instance used for network communication
27
*/
28
public Channel getChannel();
29
30
/**
31
* Check if the client connection is active
32
* @return true if the connection is active, false otherwise
33
*/
34
public boolean isActive();
35
36
/**
37
* Get the remote socket address of the connected server
38
* @return SocketAddress of the remote server
39
*/
40
public SocketAddress getSocketAddress();
41
42
/**
43
* Get the client identifier
44
* @return String representing the client ID, or null if not set
45
*/
46
public String getClientId();
47
48
/**
49
* Set the client identifier
50
* @param id - String identifier for this client
51
*/
52
public void setClientId(String id);
53
```
54
55
### Chunk Fetching
56
57
Asynchronous chunk fetching functionality for retrieving data blocks from streams.
58
59
```java { .api }
60
/**
61
* Fetch a specific chunk from a stream asynchronously
62
* @param streamId - Identifier of the stream containing the chunk
63
* @param chunkIndex - Index of the chunk to fetch within the stream
64
* @param callback - Callback to handle successful chunk reception or failures
65
*/
66
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
67
```
68
69
### Streaming Operations
70
71
Methods for handling streaming data transfer with support for bidirectional communication.
72
73
```java { .api }
74
/**
75
* Request to receive data from a named stream
76
* @param streamId - Identifier of the stream to receive data from
77
* @param callback - Callback to handle streaming data events
78
*/
79
public void stream(String streamId, StreamCallback callback);
80
81
/**
82
* Upload a stream of data to the server with metadata
83
* @param meta - Metadata buffer describing the stream contents
84
* @param data - Data buffer containing the stream data
85
* @param callback - Callback to handle the upload response
86
* @return long request ID for tracking the upload operation
87
*/
88
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
89
```
90
91
### RPC Communication
92
93
Remote procedure call functionality with both synchronous and asynchronous operation modes.
94
95
```java { .api }
96
/**
97
* Send an RPC message asynchronously
98
* @param message - ByteBuffer containing the RPC message data
99
* @param callback - Callback to handle the RPC response or failure
100
* @return long request ID for tracking the RPC call
101
*/
102
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
103
104
/**
105
* Send an RPC message synchronously with timeout
106
* @param message - ByteBuffer containing the RPC message data
107
* @param timeoutMs - Timeout in milliseconds for the RPC call
108
* @return ByteBuffer containing the response data
109
* @throws IOException if the RPC call fails or times out
110
*/
111
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
112
113
/**
114
* Send a one-way message (no response expected)
115
* @param message - ByteBuffer containing the message data
116
*/
117
public void send(ByteBuffer message);
118
```
119
120
### Merged Block Operations
121
122
Specialized operations for requesting merged block metadata, used in Spark's shuffle optimization.
123
124
```java { .api }
125
/**
126
* Request merged block metadata for shuffle operations
127
* @param appId - Application identifier
128
* @param shuffleId - Shuffle operation identifier
129
* @param shuffleMergeId - Merge operation identifier
130
* @param reduceId - Reducer task identifier
131
* @param callback - Callback to handle the metadata response
132
*/
133
public void sendMergedBlockMetaReq(String appId, int shuffleId, int shuffleMergeId, int reduceId, MergedBlockMetaResponseCallback callback);
134
```
135
136
### Request Management
137
138
Methods for managing active requests and handling timeouts.
139
140
```java { .api }
141
/**
142
* Remove a pending RPC request by its ID
143
* @param requestId - ID of the request to remove
144
*/
145
public void removeRpcRequest(long requestId);
146
147
/**
148
* Mark this client as timed out, triggering cleanup of pending requests
149
*/
150
public void timeOut();
151
```
152
153
### Resource Management
154
155
Proper cleanup and resource management for client connections.
156
157
```java { .api }
158
/**
159
* Close the client connection and clean up all resources
160
* This will cancel all pending requests and close the underlying channel
161
*/
162
public void close();
163
```
164
165
## Callback Interfaces
166
167
### RpcResponseCallback
168
169
Callback interface for handling RPC responses.
170
171
```java { .api }
172
public interface RpcResponseCallback extends BaseResponseCallback {
173
/**
174
* Called when an RPC call completes successfully
175
* @param response - ByteBuffer containing the response data
176
*/
177
void onSuccess(ByteBuffer response);
178
179
/**
180
* Called when an RPC call fails
181
* @param e - Throwable representing the failure cause
182
*/
183
void onFailure(Throwable e);
184
}
185
```
186
187
### ChunkReceivedCallback
188
189
Callback interface for handling chunk fetch operations.
190
191
```java { .api }
192
public interface ChunkReceivedCallback {
193
/**
194
* Called when a chunk is successfully received
195
* @param chunkIndex - Index of the received chunk
196
* @param buffer - ManagedBuffer containing the chunk data
197
*/
198
void onSuccess(int chunkIndex, ManagedBuffer buffer);
199
200
/**
201
* Called when chunk fetching fails
202
* @param chunkIndex - Index of the chunk that failed to be received
203
* @param e - Throwable representing the failure cause
204
*/
205
void onFailure(int chunkIndex, Throwable e);
206
}
207
```
208
209
### StreamCallback
210
211
Callback interface for handling streaming data operations.
212
213
```java { .api }
214
public interface StreamCallback {
215
/**
216
* Called when data is received from the stream
217
* @param streamId - Identifier of the stream
218
* @param buf - ByteBuffer containing the received data
219
* @throws IOException if data processing fails
220
*/
221
void onData(String streamId, ByteBuffer buf) throws IOException;
222
223
/**
224
* Called when the stream is completed successfully
225
* @param streamId - Identifier of the completed stream
226
* @throws IOException if completion processing fails
227
*/
228
void onComplete(String streamId) throws IOException;
229
230
/**
231
* Called when the stream encounters a failure
232
* @param streamId - Identifier of the failed stream
233
* @param cause - Throwable representing the failure cause
234
* @throws IOException if failure processing fails
235
*/
236
void onFailure(String streamId, Throwable cause) throws IOException;
237
}
238
```
239
240
### StreamCallbackWithID
241
242
Extended stream callback interface that includes an identifier.
243
244
```java { .api }
245
public interface StreamCallbackWithID extends StreamCallback {
246
/**
247
* Get the identifier for this stream callback
248
* @return String identifier for the callback
249
*/
250
String getID();
251
}
252
```
253
254
### MergedBlockMetaResponseCallback
255
256
Callback interface for handling merged block metadata responses.
257
258
```java { .api }
259
public interface MergedBlockMetaResponseCallback extends BaseResponseCallback {
260
/**
261
* Called when merged block metadata is successfully received
262
* @param mergedBlockMeta - MergedBlockMetaSuccess containing the metadata
263
*/
264
void onSuccess(MergedBlockMetaSuccess mergedBlockMeta);
265
266
/**
267
* Called when merged block metadata request fails
268
* @param e - Throwable representing the failure cause
269
*/
270
void onFailure(Throwable e);
271
}
272
```
273
274
## Client Factory
275
276
### TransportClientFactory
277
278
Factory class for creating and managing transport clients with connection pooling and lifecycle management.
279
280
```java { .api }
281
public class TransportClientFactory implements Closeable {
282
/**
283
* Create a transport client connected to the specified host and port
284
* @param remoteHost - Hostname or IP address of the remote server
285
* @param remotePort - Port number of the remote server
286
* @return TransportClient connected to the specified endpoint
287
* @throws IOException if connection establishment fails
288
*/
289
public TransportClient createClient(String remoteHost, int remotePort) throws IOException;
290
291
/**
292
* Create a transport client with a specific client ID
293
* @param remoteHost - Hostname or IP address of the remote server
294
* @param remotePort - Port number of the remote server
295
* @param clientId - Identifier for the client
296
* @return TransportClient connected to the specified endpoint
297
* @throws IOException if connection establishment fails
298
*/
299
public TransportClient createClient(String remoteHost, int remotePort, int clientId) throws IOException;
300
301
/**
302
* Close the factory and all associated client connections
303
*/
304
public void close();
305
}
306
```
307
308
## Usage Examples
309
310
### Basic RPC Communication
311
312
```java
313
import org.apache.spark.network.client.TransportClient;
314
import org.apache.spark.network.client.RpcResponseCallback;
315
316
// Create client through factory
317
TransportClient client = clientFactory.createClient("localhost", 9999);
318
319
// Send asynchronous RPC
320
ByteBuffer request = ByteBuffer.wrap("Hello, Server!".getBytes());
321
client.sendRpc(request, new RpcResponseCallback() {
322
@Override
323
public void onSuccess(ByteBuffer response) {
324
String responseStr = new String(response.array());
325
System.out.println("Server responded: " + responseStr);
326
}
327
328
@Override
329
public void onFailure(Throwable e) {
330
System.err.println("RPC failed: " + e.getMessage());
331
}
332
});
333
334
// Send synchronous RPC with timeout
335
try {
336
ByteBuffer syncResponse = client.sendRpcSync(request, 30000); // 30 second timeout
337
System.out.println("Sync response: " + new String(syncResponse.array()));
338
} catch (IOException e) {
339
System.err.println("Sync RPC failed: " + e.getMessage());
340
}
341
```
342
343
### Chunk Fetching
344
345
```java
346
import org.apache.spark.network.client.ChunkReceivedCallback;
347
import org.apache.spark.network.buffer.ManagedBuffer;
348
349
// Fetch chunks from a stream
350
long streamId = 12345L;
351
client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {
352
@Override
353
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
354
System.out.println("Received chunk " + chunkIndex + " with " + buffer.size() + " bytes");
355
try {
356
// Process chunk data
357
ByteBuffer data = buffer.nioByteBuffer();
358
// ... process data ...
359
} catch (IOException e) {
360
System.err.println("Failed to process chunk: " + e.getMessage());
361
} finally {
362
buffer.release(); // Important: release buffer when done
363
}
364
}
365
366
@Override
367
public void onFailure(int chunkIndex, Throwable e) {
368
System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
369
}
370
});
371
```
372
373
### Streaming Data
374
375
```java
376
import org.apache.spark.network.client.StreamCallback;
377
378
// Receive streaming data
379
client.stream("data-stream-1", new StreamCallback() {
380
@Override
381
public void onData(String streamId, ByteBuffer buf) throws IOException {
382
System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);
383
// Process streaming data
384
byte[] data = new byte[buf.remaining()];
385
buf.get(data);
386
// ... process data ...
387
}
388
389
@Override
390
public void onComplete(String streamId) throws IOException {
391
System.out.println("Stream " + streamId + " completed successfully");
392
}
393
394
@Override
395
public void onFailure(String streamId, Throwable cause) throws IOException {
396
System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
397
}
398
});
399
```
400
401
### Connection Management
402
403
```java
404
// Check connection status
405
if (client.isActive()) {
406
System.out.println("Client connected to: " + client.getSocketAddress());
407
System.out.println("Client ID: " + client.getClientId());
408
409
// Set custom client ID
410
client.setClientId("spark-client-" + System.currentTimeMillis());
411
412
// Perform operations...
413
} else {
414
System.out.println("Client connection is not active");
415
}
416
417
// Proper cleanup
418
client.close();
419
```
420
421
## Exception Handling
422
423
### ChunkFetchFailureException
424
425
Exception thrown when chunk fetching operations fail.
426
427
```java { .api }
428
public class ChunkFetchFailureException extends RuntimeException {
429
/**
430
* Create exception with error message and cause
431
* @param errorMsg - Description of the error
432
* @param cause - Underlying cause of the failure
433
*/
434
public ChunkFetchFailureException(String errorMsg, Throwable cause);
435
436
/**
437
* Create exception with error message only
438
* @param errorMsg - Description of the error
439
*/
440
public ChunkFetchFailureException(String errorMsg);
441
}
442
```
443
444
## Bootstrap Integration
445
446
### TransportClientBootstrap
447
448
Interface for customizing client initialization, commonly used for authentication and encryption setup.
449
450
```java { .api }
451
public interface TransportClientBootstrap {
452
/**
453
* Perform bootstrap operations on a newly created client
454
* @param client - TransportClient instance to bootstrap
455
* @param channel - Underlying Netty channel
456
* @throws RuntimeException if bootstrap operations fail
457
*/
458
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
459
}
460
```