0
# Asynchronous Operations
1
2
Apache Avro IPC provides comprehensive support for non-blocking RPC operations through callback-based and Future-based patterns, enabling high-performance concurrent applications.
3
4
## Capabilities
5
6
### Callback Interface
7
8
The fundamental interface for asynchronous operations, providing success and error handling methods.
9
10
```java { .api }
11
public interface Callback<T> {
12
// Handle successful result
13
void handleResult(T result);
14
15
// Handle error condition
16
void handleError(Throwable error);
17
}
18
```
19
20
#### Usage Examples
21
22
```java
23
// Simple callback implementation
24
Callback<String> callback = new Callback<String>() {
25
@Override
26
public void handleResult(String result) {
27
System.out.println("RPC completed successfully: " + result);
28
// Process result on callback thread
29
}
30
31
@Override
32
public void handleError(Throwable error) {
33
System.err.println("RPC failed: " + error.getMessage());
34
// Handle error condition
35
}
36
};
37
38
// Make asynchronous RPC call
39
requestor.request("getData", requestParams, callback);
40
41
// Lambda-based callback (Java 8+)
42
requestor.request("processData", data, new Callback<ProcessingResult>() {
43
@Override
44
public void handleResult(ProcessingResult result) {
45
result.getItems().forEach(item -> processItem(item));
46
}
47
48
@Override
49
public void handleError(Throwable error) {
50
logger.error("Processing failed", error);
51
scheduleRetry();
52
}
53
});
54
```
55
56
### Future-Based Operations
57
58
`CallFuture` provides a Future implementation that also acts as a Callback, enabling both blocking and non-blocking usage patterns.
59
60
```java { .api }
61
public class CallFuture<T> implements Future<T>, Callback<T> {
62
// Constructors
63
public CallFuture();
64
public CallFuture(Callback<T> chainedCallback);
65
66
// Result access methods
67
public T getResult() throws Exception;
68
public Throwable getError();
69
70
// Blocking wait methods
71
public void await() throws InterruptedException;
72
public void await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
73
74
// Future interface methods
75
public boolean cancel(boolean mayInterruptIfRunning);
76
public boolean isCancelled();
77
public boolean isDone();
78
public T get() throws InterruptedException, ExecutionException;
79
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
80
81
// Callback interface methods
82
public void handleResult(T result);
83
public void handleError(Throwable error);
84
}
85
```
86
87
#### Usage Examples
88
89
```java
90
// Basic Future usage
91
CallFuture<String> future = new CallFuture<>();
92
requestor.request("getData", request, future);
93
94
try {
95
// Block until result is available
96
String result = future.get();
97
System.out.println("Result: " + result);
98
} catch (ExecutionException e) {
99
System.err.println("RPC failed: " + e.getCause().getMessage());
100
}
101
102
// Future with timeout
103
CallFuture<ProcessingResult> future = new CallFuture<>();
104
requestor.request("longRunningTask", params, future);
105
106
try {
107
ProcessingResult result = future.get(30, TimeUnit.SECONDS);
108
System.out.println("Task completed: " + result.getStatus());
109
} catch (TimeoutException e) {
110
System.err.println("Task timed out");
111
future.cancel(true);
112
}
113
114
// Chained callback with Future
115
CallFuture<String> future = new CallFuture<>(new Callback<String>() {
116
@Override
117
public void handleResult(String result) {
118
// This callback is invoked in addition to Future completion
119
notifyListeners(result);
120
}
121
122
@Override
123
public void handleError(Throwable error) {
124
logError(error);
125
}
126
});
127
128
requestor.request("getData", request, future);
129
130
// Can still use Future methods
131
if (future.isDone()) {
132
String result = future.getResult(); // Non-blocking if done
133
}
134
```
135
136
### Asynchronous Requestor Operations
137
138
All requestor implementations support asynchronous operations through the callback-based request method.
139
140
```java { .api }
141
// From Requestor base class
142
public abstract class Requestor {
143
// Asynchronous RPC call
144
public <T> void request(String messageName, Object request, Callback<T> callback) throws Exception;
145
146
// Synchronous RPC call (for comparison)
147
public Object request(String messageName, Object request) throws Exception;
148
}
149
```
150
151
#### Usage Examples
152
153
```java
154
// Generic requestor async usage
155
GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
156
GenericData.Record request = createRequest();
157
158
requestor.request("processData", request, new Callback<Object>() {
159
@Override
160
public void handleResult(Object result) {
161
if (result instanceof GenericData.Record) {
162
GenericData.Record record = (GenericData.Record) result;
163
System.out.println("Status: " + record.get("status"));
164
}
165
}
166
167
@Override
168
public void handleError(Throwable error) {
169
System.err.println("Generic RPC failed: " + error.getMessage());
170
}
171
});
172
173
// Specific requestor async usage
174
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);
175
176
requestor.request("getUserData", userId, new Callback<UserData>() {
177
@Override
178
public void handleResult(UserData userData) {
179
updateUserInterface(userData);
180
}
181
182
@Override
183
public void handleError(Throwable error) {
184
showErrorMessage("Failed to load user data: " + error.getMessage());
185
}
186
});
187
```
188
189
### Asynchronous Transport Operations
190
191
Transport implementations provide asynchronous communication at the transport layer.
192
193
```java { .api }
194
// From Transceiver base class
195
public abstract class Transceiver {
196
// Asynchronous transport operation
197
public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
198
199
// Synchronous transport operation (for comparison)
200
public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
201
}
202
```
203
204
#### Usage Examples
205
206
```java
207
// Direct transport-level async usage
208
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
209
List<ByteBuffer> requestBuffers = serializeRequest(request);
210
211
transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {
212
@Override
213
public void handleResult(List<ByteBuffer> responseBuffers) {
214
Object response = deserializeResponse(responseBuffers);
215
handleResponse(response);
216
}
217
218
@Override
219
public void handleError(Throwable error) {
220
System.err.println("Transport error: " + error.getMessage());
221
}
222
});
223
```
224
225
## Advanced Asynchronous Patterns
226
227
### Concurrent RPC Calls
228
229
Execute multiple RPC calls concurrently and wait for all to complete:
230
231
```java
232
public class ConcurrentRPCExample {
233
public void fetchMultipleDataSources() {
234
List<CallFuture<String>> futures = new ArrayList<>();
235
236
// Start multiple async calls
237
for (String dataSource : dataSources) {
238
CallFuture<String> future = new CallFuture<>();
239
requestor.request("fetchData", dataSource, future);
240
futures.add(future);
241
}
242
243
// Wait for all to complete
244
List<String> results = new ArrayList<>();
245
for (CallFuture<String> future : futures) {
246
try {
247
results.add(future.get(10, TimeUnit.SECONDS));
248
} catch (Exception e) {
249
System.err.println("Failed to fetch data: " + e.getMessage());
250
}
251
}
252
253
processResults(results);
254
}
255
}
256
```
257
258
### Callback Chaining
259
260
Chain multiple asynchronous operations:
261
262
```java
263
public class CallbackChaining {
264
public void processUserWorkflow(long userId) {
265
// Step 1: Get user data
266
requestor.request("getUser", userId, new Callback<UserData>() {
267
@Override
268
public void handleResult(UserData user) {
269
// Step 2: Get user preferences
270
requestor.request("getPreferences", user.getId(), new Callback<Preferences>() {
271
@Override
272
public void handleResult(Preferences prefs) {
273
// Step 3: Customize content
274
requestor.request("customizeContent",
275
new CustomizationRequest(user, prefs),
276
new Callback<Content>() {
277
@Override
278
public void handleResult(Content content) {
279
displayContent(content);
280
}
281
282
@Override
283
public void handleError(Throwable error) {
284
showDefaultContent();
285
}
286
});
287
}
288
289
@Override
290
public void handleError(Throwable error) {
291
// Use default preferences
292
showDefaultContent();
293
}
294
});
295
}
296
297
@Override
298
public void handleError(Throwable error) {
299
showError("Failed to load user: " + error.getMessage());
300
}
301
});
302
}
303
}
304
```
305
306
### Custom Future Implementation
307
308
Create specialized Future implementations for complex scenarios:
309
310
```java
311
public class TimeoutCallFuture<T> extends CallFuture<T> {
312
private final ScheduledExecutorService scheduler;
313
private ScheduledFuture<?> timeoutTask;
314
315
public TimeoutCallFuture(long timeout, TimeUnit unit) {
316
super();
317
this.scheduler = Executors.newScheduledThreadPool(1);
318
this.timeoutTask = scheduler.schedule(() -> {
319
if (!isDone()) {
320
handleError(new TimeoutException("RPC call timed out"));
321
}
322
}, timeout, unit);
323
}
324
325
@Override
326
public void handleResult(T result) {
327
timeoutTask.cancel(false);
328
scheduler.shutdown();
329
super.handleResult(result);
330
}
331
332
@Override
333
public void handleError(Throwable error) {
334
timeoutTask.cancel(false);
335
scheduler.shutdown();
336
super.handleError(error);
337
}
338
}
339
340
// Usage
341
TimeoutCallFuture<String> future = new TimeoutCallFuture<>(5, TimeUnit.SECONDS);
342
requestor.request("slowOperation", request, future);
343
```
344
345
### Error Recovery Patterns
346
347
Implement retry logic with exponential backoff:
348
349
```java
350
public class RetryCallback<T> implements Callback<T> {
351
private final Requestor requestor;
352
private final String messageName;
353
private final Object request;
354
private final Callback<T> finalCallback;
355
private final int maxRetries;
356
private int currentTry = 0;
357
358
public RetryCallback(Requestor requestor, String messageName, Object request,
359
Callback<T> finalCallback, int maxRetries) {
360
this.requestor = requestor;
361
this.messageName = messageName;
362
this.request = request;
363
this.finalCallback = finalCallback;
364
this.maxRetries = maxRetries;
365
}
366
367
@Override
368
public void handleResult(T result) {
369
finalCallback.handleResult(result);
370
}
371
372
@Override
373
public void handleError(Throwable error) {
374
if (currentTry < maxRetries && isRetryable(error)) {
375
currentTry++;
376
long delay = Math.min(1000 * (1L << currentTry), 30000); // Exponential backoff
377
378
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
379
scheduler.schedule(() -> {
380
try {
381
requestor.request(messageName, request, this);
382
} catch (Exception e) {
383
finalCallback.handleError(e);
384
}
385
scheduler.shutdown();
386
}, delay, TimeUnit.MILLISECONDS);
387
} else {
388
finalCallback.handleError(error);
389
}
390
}
391
392
private boolean isRetryable(Throwable error) {
393
return error instanceof IOException ||
394
error instanceof SocketTimeoutException ||
395
error instanceof ConnectException;
396
}
397
}
398
399
// Usage
400
Callback<String> retryCallback = new RetryCallback<>(requestor, "getData", request,
401
new Callback<String>() {
402
@Override
403
public void handleResult(String result) {
404
System.out.println("Success: " + result);
405
}
406
407
@Override
408
public void handleError(Throwable error) {
409
System.err.println("Final failure: " + error.getMessage());
410
}
411
}, 3); // Max 3 retries
412
413
requestor.request("getData", request, retryCallback);
414
```
415
416
## Thread Safety and Concurrency
417
418
### Thread Safety Guarantees
419
420
- `CallFuture` instances are thread-safe for concurrent access
421
- Callback methods are invoked on I/O threads - avoid blocking operations
422
- Multiple concurrent async calls can be made with the same Requestor instance
423
- Transport implementations handle concurrent async operations safely
424
425
### Best Practices
426
427
```java
428
// Good: Non-blocking callback processing
429
requestor.request("getData", request, new Callback<String>() {
430
@Override
431
public void handleResult(String result) {
432
// Quick processing on callback thread
433
resultQueue.offer(result);
434
resultProcessor.signal(); // Wake up processing thread
435
}
436
437
@Override
438
public void handleError(Throwable error) {
439
errorLogger.logAsync(error); // Non-blocking logging
440
}
441
});
442
443
// Bad: Blocking operations in callback
444
requestor.request("getData", request, new Callback<String>() {
445
@Override
446
public void handleResult(String result) {
447
// This blocks the I/O thread!
448
database.saveResult(result); // Blocking database call
449
Thread.sleep(1000); // Very bad!
450
}
451
452
@Override
453
public void handleError(Throwable error) {
454
// Also bad - blocking I/O
455
System.out.println("Error: " + error.getMessage());
456
}
457
});
458
```
459
460
## Performance Considerations
461
462
### Async vs Sync Performance
463
464
- Asynchronous calls provide better throughput for concurrent operations
465
- Reduced thread usage compared to synchronous calls with thread pools
466
- Lower memory overhead per outstanding request
467
- Better resource utilization in high-concurrency scenarios
468
469
### Optimization Tips
470
471
```java
472
// Reuse CallFuture instances when possible
473
private final Queue<CallFuture<String>> futurePool = new ConcurrentLinkedQueue<>();
474
475
public CallFuture<String> getFuture() {
476
CallFuture<String> future = futurePool.poll();
477
return future != null ? future : new CallFuture<>();
478
}
479
480
public void returnFuture(CallFuture<String> future) {
481
if (future.isDone()) {
482
future.reset(); // Hypothetical reset method
483
futurePool.offer(future);
484
}
485
}
486
487
// Use appropriate timeout values
488
CallFuture<String> future = new CallFuture<>();
489
requestor.request("quickOperation", request, future);
490
String result = future.get(100, TimeUnit.MILLISECONDS); // Short timeout for quick ops
491
492
// Batch related operations
493
List<CallFuture<String>> batch = new ArrayList<>();
494
for (String item : items) {
495
CallFuture<String> future = new CallFuture<>();
496
requestor.request("processItem", item, future);
497
batch.add(future);
498
}
499
// Process results in batch
500
```