0
# Asynchronous Operations
1
2
Non-blocking client and server implementations for high-performance applications. The async framework enables handling many concurrent operations without blocking threads, making it ideal for high-throughput scenarios.
3
4
## Capabilities
5
6
### Async Client Framework
7
8
Base classes and interfaces for asynchronous client implementations.
9
10
```java { .api }
11
/**
12
* Abstract base class for asynchronous client implementations
13
*/
14
public abstract class TAsyncClient {
15
/** Create async client with protocol factory, manager, and transport */
16
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport);
17
18
/** Create async client with timeout */
19
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout);
20
21
/** Get the protocol factory used by this client */
22
public TProtocolFactory getProtocolFactory();
23
24
/** Get the current timeout value in milliseconds */
25
public long getTimeout();
26
27
/** Set timeout for operations in milliseconds */
28
public void setTimeout(long timeout);
29
30
/** Check if client has an error */
31
public boolean hasError();
32
33
/** Get the current error (if any) */
34
public Exception getError();
35
36
/** Check if last operation timed out */
37
public boolean hasTimeout();
38
39
/** Called when async operation completes successfully */
40
protected void onComplete();
41
42
/** Called when async operation encounters an error */
43
protected void onError(Exception exception);
44
}
45
```
46
47
### Async Client Manager
48
49
Manager for handling asynchronous client operations and selector threads.
50
51
```java { .api }
52
/**
53
* Manages asynchronous client connections and operations
54
*/
55
public class TAsyncClientManager {
56
/** Create client manager with default settings */
57
public TAsyncClientManager() throws IOException;
58
59
/** Create client manager with custom configuration */
60
public TAsyncClientManager(int selectThreadCount, int selectorThreadPoolSize, long timeoutCheckInterval) throws IOException;
61
62
/** Execute an asynchronous method call */
63
public void call(TAsyncMethodCall method) throws TException;
64
65
/** Stop the client manager and all selector threads */
66
public void stop();
67
68
/** Check if client manager is stopped */
69
public boolean isStopped();
70
}
71
```
72
73
### Async Method Calls
74
75
Classes representing asynchronous method invocations.
76
77
```java { .api }
78
/**
79
* Abstract base class representing an asynchronous method call
80
*/
81
public abstract class TAsyncMethodCall<T> {
82
/** Start the async method call */
83
public void start(Selector sel) throws IOException;
84
85
/** Check if the method call has finished */
86
public boolean isFinished();
87
88
/** Get the result of the method call (blocks until complete) */
89
public T getResult() throws Exception;
90
91
/** Check if method call has an error */
92
public boolean hasError();
93
94
/** Get the error (if any) */
95
public Exception getError();
96
97
/** Check if method call timed out */
98
public boolean hasTimeout();
99
100
/** Get the client that made this call */
101
public TAsyncClient getClient();
102
103
/** Get the transport for this call */
104
protected TNonblockingTransport getTransport();
105
106
/** Called when operation completes successfully */
107
protected abstract void onComplete();
108
109
/** Called when operation encounters an error */
110
protected abstract void onError(Exception exception);
111
112
/** Clean up resources and fire callback */
113
protected void cleanUpAndFireCallback(SelectionKey key);
114
}
115
```
116
117
### Async Callbacks
118
119
Callback interfaces for handling asynchronous operation results.
120
121
```java { .api }
122
/**
123
* Callback interface for asynchronous method calls
124
* @param <T> The type of the result
125
*/
126
public interface AsyncMethodCallback<T> {
127
/** Called when the asynchronous operation completes successfully */
128
public void onComplete(T response);
129
130
/** Called when the asynchronous operation encounters an error */
131
public void onError(Exception exception);
132
}
133
```
134
135
**Usage Examples:**
136
137
```java
138
import org.apache.thrift.async.AsyncMethodCallback;
139
import org.apache.thrift.async.TAsyncClientManager;
140
import org.apache.thrift.transport.TNonblockingSocket;
141
import org.apache.thrift.protocol.TBinaryProtocol;
142
143
// Create async client manager
144
TAsyncClientManager clientManager = new TAsyncClientManager();
145
146
// Create non-blocking transport
147
TNonblockingSocket transport = new TNonblockingSocket("localhost", 9090);
148
149
// Create async client
150
MyService.AsyncClient asyncClient = new MyService.AsyncClient(
151
new TBinaryProtocol.Factory(),
152
clientManager,
153
transport
154
);
155
156
// Define callback for async method
157
AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
158
public void onComplete(String response) {
159
System.out.println("Received response: " + response);
160
}
161
162
public void onError(Exception exception) {
163
System.err.println("Error occurred: " + exception.getMessage());
164
}
165
};
166
167
// Make async method call
168
asyncClient.myAsyncMethod("parameter", callback);
169
170
// Continue with other work while call executes...
171
172
// Cleanup when done
173
clientManager.stop();
174
```
175
176
### Async Server Framework
177
178
Base classes for asynchronous server-side processing.
179
180
```java { .api }
181
/**
182
* Interface for asynchronous request processing
183
*/
184
public interface TAsyncProcessor {
185
/** Process an asynchronous request */
186
public void process(AsyncProcessFunction<?, ?, ?> function, TProtocol iproto, TProtocol oproto, AsyncMethodCallback callback) throws TException;
187
}
188
189
/**
190
* Base implementation for asynchronous processors
191
*/
192
public class TBaseAsyncProcessor implements TAsyncProcessor {
193
/** Create base async processor */
194
public TBaseAsyncProcessor();
195
196
/** Process async request using registered process functions */
197
public void process(AsyncProcessFunction<?, ?, ?> function, TProtocol iproto, TProtocol oproto, AsyncMethodCallback callback) throws TException;
198
199
/** Register an async process function for a method name */
200
protected void registerProcessor(String methodName, AsyncProcessFunction<?, ?, ?> fn);
201
202
/** Get registered process function by name */
203
protected AsyncProcessFunction<?, ?, ?> getProcessFunction(String methodName);
204
}
205
```
206
207
### Async Process Functions
208
209
Base classes for implementing asynchronous service methods.
210
211
```java { .api }
212
/**
213
* Abstract base class for asynchronous processing functions
214
* @param <I> The service interface type
215
* @param <T> The arguments type
216
* @param <R> The result type
217
*/
218
public abstract class AsyncProcessFunction<I, T extends TBase, R> {
219
/** Create async process function with method name */
220
public AsyncProcessFunction(String methodName);
221
222
/** Get the method name */
223
public String getMethodName();
224
225
/** Start processing the async request */
226
public abstract void start(I iface, long seqid, TProtocol iprot, TProtocol oprot, AsyncMethodCallback resultHandler) throws TException;
227
228
/** Check if method is oneway (no response expected) */
229
public abstract boolean isOneway();
230
231
/** Create and read the arguments from protocol */
232
protected abstract T getEmptyArgsInstance();
233
234
/** Create and read the result from protocol */
235
protected abstract TBase getEmptyResultInstance();
236
}
237
```
238
239
**Usage Examples for Async Server:**
240
241
```java
242
import org.apache.thrift.async.AsyncMethodCallback;
243
import org.apache.thrift.async.AsyncProcessFunction;
244
import org.apache.thrift.protocol.TProtocol;
245
246
// Example async service implementation
247
public class MyAsyncServiceHandler implements MyService.AsyncIface {
248
249
public void myAsyncMethod(String param, AsyncMethodCallback<String> callback) {
250
// Simulate async processing (e.g., database call, external service)
251
CompletableFuture.supplyAsync(() -> {
252
try {
253
// Do async work...
254
Thread.sleep(100); // Simulate work
255
return "Processed: " + param;
256
} catch (Exception e) {
257
throw new RuntimeException(e);
258
}
259
}).whenComplete((result, exception) -> {
260
if (exception != null) {
261
callback.onError(exception);
262
} else {
263
callback.onComplete(result);
264
}
265
});
266
}
267
}
268
269
// Custom async process function
270
public class MyAsyncProcessFunction extends AsyncProcessFunction<MyService.AsyncIface, MyMethod_args, String> {
271
272
public MyAsyncProcessFunction() {
273
super("myAsyncMethod");
274
}
275
276
public void start(MyService.AsyncIface iface, long seqid, TProtocol iprot, TProtocol oprot, AsyncMethodCallback<String> resultHandler) throws TException {
277
MyMethod_args args = new MyMethod_args();
278
args.read(iprot);
279
iprot.readMessageEnd();
280
281
// Create callback that writes response
282
AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
283
public void onComplete(String response) {
284
try {
285
MyMethod_result result = new MyMethod_result();
286
result.success = response;
287
288
oprot.writeMessageBegin(new TMessage("myAsyncMethod", TMessageType.REPLY, seqid));
289
result.write(oprot);
290
oprot.writeMessageEnd();
291
oprot.getTransport().flush();
292
293
resultHandler.onComplete(response);
294
} catch (Exception e) {
295
onError(e);
296
}
297
}
298
299
public void onError(Exception exception) {
300
try {
301
TApplicationException appEx = new TApplicationException(TApplicationException.INTERNAL_ERROR, exception.getMessage());
302
oprot.writeMessageBegin(new TMessage("myAsyncMethod", TMessageType.EXCEPTION, seqid));
303
appEx.write(oprot);
304
oprot.writeMessageEnd();
305
oprot.getTransport().flush();
306
} catch (Exception e) {
307
// Log error
308
}
309
resultHandler.onError(exception);
310
}
311
};
312
313
// Call async method
314
iface.myAsyncMethod(args.param, callback);
315
}
316
317
public boolean isOneway() {
318
return false;
319
}
320
321
protected MyMethod_args getEmptyArgsInstance() {
322
return new MyMethod_args();
323
}
324
325
protected MyMethod_result getEmptyResultInstance() {
326
return new MyMethod_result();
327
}
328
}
329
```
330
331
### Async Client Example with Connection Pool
332
333
```java
334
import org.apache.thrift.async.TAsyncClientManager;
335
import org.apache.thrift.async.AsyncMethodCallback;
336
import org.apache.thrift.transport.TNonblockingSocket;
337
import org.apache.thrift.protocol.TBinaryProtocol;
338
import java.util.concurrent.CountDownLatch;
339
import java.util.concurrent.atomic.AtomicInteger;
340
341
public class AsyncClientPool {
342
private final TAsyncClientManager clientManager;
343
private final String host;
344
private final int port;
345
346
public AsyncClientPool(String host, int port) throws IOException {
347
this.clientManager = new TAsyncClientManager();
348
this.host = host;
349
this.port = port;
350
}
351
352
public void executeMultipleAsync(int numCalls) throws Exception {
353
CountDownLatch latch = new CountDownLatch(numCalls);
354
AtomicInteger successCount = new AtomicInteger(0);
355
AtomicInteger errorCount = new AtomicInteger(0);
356
357
for (int i = 0; i < numCalls; i++) {
358
// Create new transport for each call
359
TNonblockingSocket transport = new TNonblockingSocket(host, port);
360
361
// Create async client
362
MyService.AsyncClient client = new MyService.AsyncClient(
363
new TBinaryProtocol.Factory(),
364
clientManager,
365
transport
366
);
367
368
// Create callback
369
final int callId = i;
370
AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
371
public void onComplete(String response) {
372
System.out.println("Call " + callId + " completed: " + response);
373
successCount.incrementAndGet();
374
latch.countDown();
375
}
376
377
public void onError(Exception exception) {
378
System.err.println("Call " + callId + " failed: " + exception.getMessage());
379
errorCount.incrementAndGet();
380
latch.countDown();
381
}
382
};
383
384
// Make async call
385
client.myAsyncMethod("Request " + i, callback);
386
}
387
388
// Wait for all calls to complete
389
latch.await();
390
391
System.out.println("Completed: " + successCount.get() + " successful, " + errorCount.get() + " errors");
392
}
393
394
public void shutdown() {
395
clientManager.stop();
396
}
397
}
398
```
399
400
### Async Server Integration
401
402
```java
403
import org.apache.thrift.server.TNonblockingServer;
404
import org.apache.thrift.transport.TNonblockingServerSocket;
405
import org.apache.thrift.protocol.TBinaryProtocol;
406
407
public class AsyncServerExample {
408
public static void main(String[] args) throws Exception {
409
// Create async processor
410
MyService.AsyncProcessor<MyAsyncServiceHandler> processor =
411
new MyService.AsyncProcessor<>(new MyAsyncServiceHandler());
412
413
// Create non-blocking server transport
414
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
415
416
// Create non-blocking server
417
TNonblockingServer server = new TNonblockingServer(
418
new TNonblockingServer.Args(serverTransport)
419
.processor(processor)
420
.protocolFactory(new TBinaryProtocol.Factory())
421
);
422
423
System.out.println("Starting async server on port 9090...");
424
server.serve();
425
}
426
}
427
```
428
429
### Error Handling in Async Operations
430
431
```java
432
import org.apache.thrift.async.AsyncMethodCallback;
433
import org.apache.thrift.TException;
434
435
// Robust async callback with error handling
436
public abstract class RobustAsyncCallback<T> implements AsyncMethodCallback<T> {
437
private final int maxRetries;
438
private int retryCount = 0;
439
440
public RobustAsyncCallback(int maxRetries) {
441
this.maxRetries = maxRetries;
442
}
443
444
public final void onError(Exception exception) {
445
if (shouldRetry(exception) && retryCount < maxRetries) {
446
retryCount++;
447
System.out.println("Retrying operation (attempt " + retryCount + "/" + maxRetries + ")");
448
retry();
449
} else {
450
handleFinalError(exception);
451
}
452
}
453
454
protected boolean shouldRetry(Exception exception) {
455
// Retry on timeout or connection errors, but not on application errors
456
return !(exception instanceof TApplicationException) && retryCount < maxRetries;
457
}
458
459
protected abstract void retry();
460
protected abstract void handleFinalError(Exception exception);
461
}
462
463
// Usage example
464
RobustAsyncCallback<String> robustCallback = new RobustAsyncCallback<String>(3) {
465
@Override
466
public void onComplete(String response) {
467
System.out.println("Success: " + response);
468
}
469
470
@Override
471
protected void retry() {
472
// Re-execute the async method call
473
client.myAsyncMethod("parameter", this);
474
}
475
476
@Override
477
protected void handleFinalError(Exception exception) {
478
System.err.println("Final failure after retries: " + exception.getMessage());
479
}
480
};
481
```