Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations
—
Apache Avro IPC provides comprehensive support for non-blocking RPC operations through callback-based and Future-based patterns, enabling high-performance concurrent applications.
The fundamental interface for asynchronous operations, providing success and error handling methods.
public interface Callback<T> {
// Handle successful result
void handleResult(T result);
// Handle error condition
void handleError(Throwable error);
}// Simple callback implementation
Callback<String> callback = new Callback<String>() {
@Override
public void handleResult(String result) {
System.out.println("RPC completed successfully: " + result);
// Process result on callback thread
}
@Override
public void handleError(Throwable error) {
System.err.println("RPC failed: " + error.getMessage());
// Handle error condition
}
};
// Make asynchronous RPC call
requestor.request("getData", requestParams, callback);
// Lambda-based callback (Java 8+)
requestor.request("processData", data, new Callback<ProcessingResult>() {
@Override
public void handleResult(ProcessingResult result) {
result.getItems().forEach(item -> processItem(item));
}
@Override
public void handleError(Throwable error) {
logger.error("Processing failed", error);
scheduleRetry();
}
});CallFuture provides a Future implementation that also acts as a Callback, enabling both blocking and non-blocking usage patterns.
public class CallFuture<T> implements Future<T>, Callback<T> {
// Constructors
public CallFuture();
public CallFuture(Callback<T> chainedCallback);
// Result access methods
public T getResult() throws Exception;
public Throwable getError();
// Blocking wait methods
public void await() throws InterruptedException;
public void await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
// Future interface methods
public boolean cancel(boolean mayInterruptIfRunning);
public boolean isCancelled();
public boolean isDone();
public T get() throws InterruptedException, ExecutionException;
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
// Callback interface methods
public void handleResult(T result);
public void handleError(Throwable error);
}// Basic Future usage
CallFuture<String> future = new CallFuture<>();
requestor.request("getData", request, future);
try {
// Block until result is available
String result = future.get();
System.out.println("Result: " + result);
} catch (ExecutionException e) {
System.err.println("RPC failed: " + e.getCause().getMessage());
}
// Future with timeout
CallFuture<ProcessingResult> future = new CallFuture<>();
requestor.request("longRunningTask", params, future);
try {
ProcessingResult result = future.get(30, TimeUnit.SECONDS);
System.out.println("Task completed: " + result.getStatus());
} catch (TimeoutException e) {
System.err.println("Task timed out");
future.cancel(true);
}
// Chained callback with Future
CallFuture<String> future = new CallFuture<>(new Callback<String>() {
@Override
public void handleResult(String result) {
// This callback is invoked in addition to Future completion
notifyListeners(result);
}
@Override
public void handleError(Throwable error) {
logError(error);
}
});
requestor.request("getData", request, future);
// Can still use Future methods
if (future.isDone()) {
String result = future.getResult(); // Non-blocking if done
}All requestor implementations support asynchronous operations through the callback-based request method.
// From Requestor base class
public abstract class Requestor {
// Asynchronous RPC call
public <T> void request(String messageName, Object request, Callback<T> callback) throws Exception;
// Synchronous RPC call (for comparison)
public Object request(String messageName, Object request) throws Exception;
}// Generic requestor async usage
GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
GenericData.Record request = createRequest();
requestor.request("processData", request, new Callback<Object>() {
@Override
public void handleResult(Object result) {
if (result instanceof GenericData.Record) {
GenericData.Record record = (GenericData.Record) result;
System.out.println("Status: " + record.get("status"));
}
}
@Override
public void handleError(Throwable error) {
System.err.println("Generic RPC failed: " + error.getMessage());
}
});
// Specific requestor async usage
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);
requestor.request("getUserData", userId, new Callback<UserData>() {
@Override
public void handleResult(UserData userData) {
updateUserInterface(userData);
}
@Override
public void handleError(Throwable error) {
showErrorMessage("Failed to load user data: " + error.getMessage());
}
});Transport implementations provide asynchronous communication at the transport layer.
// From Transceiver base class
public abstract class Transceiver {
// Asynchronous transport operation
public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
// Synchronous transport operation (for comparison)
public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
}// Direct transport-level async usage
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
List<ByteBuffer> requestBuffers = serializeRequest(request);
transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {
@Override
public void handleResult(List<ByteBuffer> responseBuffers) {
Object response = deserializeResponse(responseBuffers);
handleResponse(response);
}
@Override
public void handleError(Throwable error) {
System.err.println("Transport error: " + error.getMessage());
}
});Execute multiple RPC calls concurrently and wait for all to complete:
public class ConcurrentRPCExample {
public void fetchMultipleDataSources() {
List<CallFuture<String>> futures = new ArrayList<>();
// Start multiple async calls
for (String dataSource : dataSources) {
CallFuture<String> future = new CallFuture<>();
requestor.request("fetchData", dataSource, future);
futures.add(future);
}
// Wait for all to complete
List<String> results = new ArrayList<>();
for (CallFuture<String> future : futures) {
try {
results.add(future.get(10, TimeUnit.SECONDS));
} catch (Exception e) {
System.err.println("Failed to fetch data: " + e.getMessage());
}
}
processResults(results);
}
}Chain multiple asynchronous operations:
public class CallbackChaining {
public void processUserWorkflow(long userId) {
// Step 1: Get user data
requestor.request("getUser", userId, new Callback<UserData>() {
@Override
public void handleResult(UserData user) {
// Step 2: Get user preferences
requestor.request("getPreferences", user.getId(), new Callback<Preferences>() {
@Override
public void handleResult(Preferences prefs) {
// Step 3: Customize content
requestor.request("customizeContent",
new CustomizationRequest(user, prefs),
new Callback<Content>() {
@Override
public void handleResult(Content content) {
displayContent(content);
}
@Override
public void handleError(Throwable error) {
showDefaultContent();
}
});
}
@Override
public void handleError(Throwable error) {
// Use default preferences
showDefaultContent();
}
});
}
@Override
public void handleError(Throwable error) {
showError("Failed to load user: " + error.getMessage());
}
});
}
}Create specialized Future implementations for complex scenarios:
public class TimeoutCallFuture<T> extends CallFuture<T> {
private final ScheduledExecutorService scheduler;
private ScheduledFuture<?> timeoutTask;
public TimeoutCallFuture(long timeout, TimeUnit unit) {
super();
this.scheduler = Executors.newScheduledThreadPool(1);
this.timeoutTask = scheduler.schedule(() -> {
if (!isDone()) {
handleError(new TimeoutException("RPC call timed out"));
}
}, timeout, unit);
}
@Override
public void handleResult(T result) {
timeoutTask.cancel(false);
scheduler.shutdown();
super.handleResult(result);
}
@Override
public void handleError(Throwable error) {
timeoutTask.cancel(false);
scheduler.shutdown();
super.handleError(error);
}
}
// Usage
TimeoutCallFuture<String> future = new TimeoutCallFuture<>(5, TimeUnit.SECONDS);
requestor.request("slowOperation", request, future);Implement retry logic with exponential backoff:
public class RetryCallback<T> implements Callback<T> {
private final Requestor requestor;
private final String messageName;
private final Object request;
private final Callback<T> finalCallback;
private final int maxRetries;
private int currentTry = 0;
public RetryCallback(Requestor requestor, String messageName, Object request,
Callback<T> finalCallback, int maxRetries) {
this.requestor = requestor;
this.messageName = messageName;
this.request = request;
this.finalCallback = finalCallback;
this.maxRetries = maxRetries;
}
@Override
public void handleResult(T result) {
finalCallback.handleResult(result);
}
@Override
public void handleError(Throwable error) {
if (currentTry < maxRetries && isRetryable(error)) {
currentTry++;
long delay = Math.min(1000 * (1L << currentTry), 30000); // Exponential backoff
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
try {
requestor.request(messageName, request, this);
} catch (Exception e) {
finalCallback.handleError(e);
}
scheduler.shutdown();
}, delay, TimeUnit.MILLISECONDS);
} else {
finalCallback.handleError(error);
}
}
private boolean isRetryable(Throwable error) {
return error instanceof IOException ||
error instanceof SocketTimeoutException ||
error instanceof ConnectException;
}
}
// Usage
Callback<String> retryCallback = new RetryCallback<>(requestor, "getData", request,
new Callback<String>() {
@Override
public void handleResult(String result) {
System.out.println("Success: " + result);
}
@Override
public void handleError(Throwable error) {
System.err.println("Final failure: " + error.getMessage());
}
}, 3); // Max 3 retries
requestor.request("getData", request, retryCallback);CallFuture instances are thread-safe for concurrent access// Good: Non-blocking callback processing
requestor.request("getData", request, new Callback<String>() {
@Override
public void handleResult(String result) {
// Quick processing on callback thread
resultQueue.offer(result);
resultProcessor.signal(); // Wake up processing thread
}
@Override
public void handleError(Throwable error) {
errorLogger.logAsync(error); // Non-blocking logging
}
});
// Bad: Blocking operations in callback
requestor.request("getData", request, new Callback<String>() {
@Override
public void handleResult(String result) {
// This blocks the I/O thread!
database.saveResult(result); // Blocking database call
Thread.sleep(1000); // Very bad!
}
@Override
public void handleError(Throwable error) {
// Also bad - blocking I/O
System.out.println("Error: " + error.getMessage());
}
});// Reuse CallFuture instances when possible
private final Queue<CallFuture<String>> futurePool = new ConcurrentLinkedQueue<>();
public CallFuture<String> getFuture() {
CallFuture<String> future = futurePool.poll();
return future != null ? future : new CallFuture<>();
}
public void returnFuture(CallFuture<String> future) {
if (future.isDone()) {
future.reset(); // Hypothetical reset method
futurePool.offer(future);
}
}
// Use appropriate timeout values
CallFuture<String> future = new CallFuture<>();
requestor.request("quickOperation", request, future);
String result = future.get(100, TimeUnit.MILLISECONDS); // Short timeout for quick ops
// Batch related operations
List<CallFuture<String>> batch = new ArrayList<>();
for (String item : items) {
CallFuture<String> future = new CallFuture<>();
requestor.request("processItem", item, future);
batch.add(future);
}
// Process results in batchInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-ipc