0
# Client Operations
1
2
Client-side networking functionality for establishing connections, sending RPC requests, and fetching data chunks from remote servers. The client API provides both synchronous and asynchronous operations for different use cases.
3
4
## Capabilities
5
6
### TransportClient
7
8
Main client class for network operations including RPC calls, chunk fetching, and streaming.
9
10
```java { .api }
11
/**
12
* TransportClient provides the client-side API for network communication.
13
* It supports RPC requests, chunk fetching, and streaming operations.
14
* All operations are thread-safe and can be called concurrently.
15
*/
16
public class TransportClient implements Closeable {
17
public TransportClient(Channel channel, TransportResponseHandler handler);
18
19
/** Gets the underlying Netty channel */
20
public Channel getChannel();
21
22
/** Checks if the client connection is active */
23
public boolean isActive();
24
25
/** Gets the remote socket address */
26
public SocketAddress getSocketAddress();
27
28
/** Gets the client identifier */
29
public String getClientId();
30
31
/** Sets the client identifier */
32
public void setClientId(String id);
33
34
/**
35
* Fetches a specific chunk from a stream asynchronously.
36
*
37
* @param streamId The stream identifier
38
* @param chunkIndex The index of the chunk to fetch
39
* @param callback Callback to handle the chunk data or failure
40
*/
41
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
42
43
/**
44
* Opens a stream for continuous data transfer.
45
*
46
* @param streamId The stream identifier
47
* @param callback Callback to handle stream data, completion, or failure
48
*/
49
public void stream(String streamId, StreamCallback callback);
50
51
/**
52
* Sends an RPC request asynchronously.
53
*
54
* @param message The request message as ByteBuffer
55
* @param callback Callback to handle the response or failure
56
* @return Request ID that can be used to cancel the request
57
*/
58
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
59
60
/**
61
* Sends an RPC request synchronously with timeout.
62
*
63
* @param message The request message as ByteBuffer
64
* @param timeoutMs Timeout in milliseconds
65
* @return The response message as ByteBuffer
66
* @throws RuntimeException if the request fails or times out
67
*/
68
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
69
70
/**
71
* Sends a one-way message (no response expected).
72
*
73
* @param message The message as ByteBuffer
74
*/
75
public void send(ByteBuffer message);
76
77
/**
78
* Removes a pending RPC request (cancels it).
79
*
80
* @param requestId The request ID returned by sendRpc
81
*/
82
public void removeRpcRequest(long requestId);
83
84
/** Forces a timeout on all pending requests */
85
public void timeOut();
86
87
/** Closes the client connection and releases resources */
88
public void close();
89
90
public String toString();
91
}
92
```
93
94
### TransportClientFactory
95
96
Factory for creating and managing TransportClient instances.
97
98
```java { .api }
99
/**
100
* Factory for creating TransportClient instances with connection pooling and management.
101
* Manages connection lifecycle and provides both managed and unmanaged clients.
102
*/
103
public class TransportClientFactory implements Closeable {
104
/**
105
* Creates a managed client connection to the specified host and port.
106
* Managed clients are pooled and reused for efficiency.
107
*
108
* @param remoteHost The remote host to connect to
109
* @param remotePort The remote port to connect to
110
* @return A TransportClient instance
111
* @throws IOException if connection fails
112
*/
113
public TransportClient createClient(String remoteHost, int remotePort) throws IOException;
114
115
/**
116
* Creates an unmanaged client connection to the specified host and port.
117
* Unmanaged clients are not pooled and must be explicitly closed.
118
*
119
* @param remoteHost The remote host to connect to
120
* @param remotePort The remote port to connect to
121
* @return A TransportClient instance
122
* @throws IOException if connection fails
123
*/
124
public TransportClient createUnmanagedClient(String remoteHost, int remotePort) throws IOException;
125
126
/** Closes the factory and all managed client connections */
127
public void close();
128
}
129
```
130
131
### Callback Interfaces
132
133
Callback interfaces for handling asynchronous operations.
134
135
```java { .api }
136
/**
137
* Callback interface for chunk fetch operations.
138
* Implementations handle successful chunk receipt or fetch failures.
139
*/
140
public interface ChunkReceivedCallback {
141
/**
142
* Called when a chunk is successfully received.
143
*
144
* @param chunkIndex The index of the received chunk
145
* @param buffer The chunk data as a ManagedBuffer
146
*/
147
void onSuccess(int chunkIndex, ManagedBuffer buffer);
148
149
/**
150
* Called when chunk fetching fails.
151
*
152
* @param chunkIndex The index of the chunk that failed
153
* @param e The exception that caused the failure
154
*/
155
void onFailure(int chunkIndex, Throwable e);
156
}
157
158
/**
159
* Callback interface for RPC response handling.
160
* Implementations handle successful responses or RPC failures.
161
*/
162
public interface RpcResponseCallback {
163
/**
164
* Called when an RPC response is successfully received.
165
*
166
* @param response The response message as ByteBuffer
167
*/
168
void onSuccess(ByteBuffer response);
169
170
/**
171
* Called when an RPC request fails.
172
*
173
* @param e The exception that caused the failure
174
*/
175
void onFailure(Throwable e);
176
}
177
178
/**
179
* Callback interface for stream operations.
180
* Implementations handle stream data, completion, and failures.
181
*/
182
public interface StreamCallback {
183
/**
184
* Called when stream data is received.
185
*
186
* @param streamId The stream identifier
187
* @param buf The stream data as ByteBuffer
188
*/
189
void onData(String streamId, ByteBuffer buf);
190
191
/**
192
* Called when the stream completes successfully.
193
*
194
* @param streamId The stream identifier
195
*/
196
void onComplete(String streamId);
197
198
/**
199
* Called when the stream fails.
200
*
201
* @param streamId The stream identifier
202
* @param cause The exception that caused the failure
203
*/
204
void onFailure(String streamId, Throwable cause);
205
}
206
```
207
208
## Usage Examples
209
210
### Basic RPC Operations
211
212
```java
213
import org.apache.spark.network.client.TransportClient;
214
import org.apache.spark.network.client.RpcResponseCallback;
215
import java.nio.ByteBuffer;
216
217
// Asynchronous RPC call
218
TransportClient client = clientFactory.createClient("server-host", 8080);
219
220
ByteBuffer request = ByteBuffer.wrap("Hello Server".getBytes());
221
222
client.sendRpc(request, new RpcResponseCallback() {
223
@Override
224
public void onSuccess(ByteBuffer response) {
225
String responseStr = new String(response.array());
226
System.out.println("Received response: " + responseStr);
227
}
228
229
@Override
230
public void onFailure(Throwable e) {
231
System.err.println("RPC failed: " + e.getMessage());
232
}
233
});
234
235
// Synchronous RPC call with timeout
236
try {
237
ByteBuffer response = client.sendRpcSync(request, 5000); // 5 second timeout
238
String responseStr = new String(response.array());
239
System.out.println("Sync response: " + responseStr);
240
} catch (Exception e) {
241
System.err.println("Sync RPC failed: " + e.getMessage());
242
}
243
```
244
245
### Chunk Fetching
246
247
```java
248
import org.apache.spark.network.client.ChunkReceivedCallback;
249
import org.apache.spark.network.buffer.ManagedBuffer;
250
251
// Fetch specific chunks from a stream
252
long streamId = 12345L;
253
254
for (int chunkIndex = 0; chunkIndex < 10; chunkIndex++) {
255
final int currentChunk = chunkIndex;
256
257
client.fetchChunk(streamId, chunkIndex, new ChunkReceivedCallback() {
258
@Override
259
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
260
System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
261
262
// Process the chunk data
263
try {
264
ByteBuffer data = buffer.nioByteBuffer();
265
// Process data...
266
} catch (Exception e) {
267
System.err.println("Failed to process chunk: " + e.getMessage());
268
} finally {
269
buffer.release(); // Important: release buffer when done
270
}
271
}
272
273
@Override
274
public void onFailure(int chunkIndex, Throwable e) {
275
System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
276
}
277
});
278
}
279
```
280
281
### Stream Operations
282
283
```java
284
import org.apache.spark.network.client.StreamCallback;
285
286
// Open a stream for continuous data transfer
287
String streamId = "data-stream-001";
288
289
client.stream(streamId, new StreamCallback() {
290
@Override
291
public void onData(String streamId, ByteBuffer buf) {
292
System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);
293
294
// Process stream data
295
byte[] data = new byte[buf.remaining()];
296
buf.get(data);
297
// Process data...
298
}
299
300
@Override
301
public void onComplete(String streamId) {
302
System.out.println("Stream " + streamId + " completed successfully");
303
}
304
305
@Override
306
public void onFailure(String streamId, Throwable cause) {
307
System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
308
}
309
});
310
```
311
312
### Connection Management
313
314
```java
315
// Create managed clients (recommended for most use cases)
316
TransportClient managedClient = clientFactory.createClient("server1", 8080);
317
318
// Client is automatically managed by the factory
319
// No need to explicitly close (factory handles cleanup)
320
321
// Create unmanaged clients (for special cases)
322
TransportClient unmanagedClient = clientFactory.createUnmanagedClient("server2", 9090);
323
324
// Important: Must explicitly close unmanaged clients
325
try {
326
// Use client...
327
} finally {
328
unmanagedClient.close();
329
}
330
331
// Check client status
332
if (client.isActive()) {
333
System.out.println("Client connected to: " + client.getSocketAddress());
334
System.out.println("Client ID: " + client.getClientId());
335
} else {
336
System.out.println("Client connection is not active");
337
}
338
```
339
340
### Error Handling and Request Cancellation
341
342
```java
343
// Send RPC with ability to cancel
344
long requestId = client.sendRpc(request, new RpcResponseCallback() {
345
@Override
346
public void onSuccess(ByteBuffer response) {
347
// Handle success
348
}
349
350
@Override
351
public void onFailure(Throwable e) {
352
if (e instanceof TimeoutException) {
353
System.err.println("Request timed out");
354
} else {
355
System.err.println("Request failed: " + e.getMessage());
356
}
357
}
358
});
359
360
// Cancel the request if needed
361
if (shouldCancel) {
362
client.removeRpcRequest(requestId);
363
}
364
365
// Force timeout all pending requests (emergency cleanup)
366
client.timeOut();
367
```
368
369
### One-Way Messages
370
371
```java
372
// Send fire-and-forget messages
373
ByteBuffer notification = ByteBuffer.wrap("Server notification".getBytes());
374
375
client.send(notification); // No response expected or callback needed
376
377
// Useful for notifications, heartbeats, or status updates
378
```