CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-ipc

Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations

Pending
Overview
Eval results
Files

async.mddocs/

Asynchronous Operations

Apache Avro IPC provides comprehensive support for non-blocking RPC operations through callback-based and Future-based patterns, enabling high-performance concurrent applications.

Capabilities

Callback Interface

The fundamental interface for asynchronous operations, providing success and error handling methods.

public interface Callback<T> {
    // Handle successful result
    void handleResult(T result);
    
    // Handle error condition
    void handleError(Throwable error);
}

Usage Examples

// Simple callback implementation
Callback<String> callback = new Callback<String>() {
    @Override
    public void handleResult(String result) {
        System.out.println("RPC completed successfully: " + result);
        // Process result on callback thread
    }
    
    @Override
    public void handleError(Throwable error) {
        System.err.println("RPC failed: " + error.getMessage());
        // Handle error condition
    }
};

// Make asynchronous RPC call
requestor.request("getData", requestParams, callback);

// Lambda-based callback (Java 8+)
requestor.request("processData", data, new Callback<ProcessingResult>() {
    @Override
    public void handleResult(ProcessingResult result) {
        result.getItems().forEach(item -> processItem(item));
    }
    
    @Override
    public void handleError(Throwable error) {
        logger.error("Processing failed", error);
        scheduleRetry();
    }
});

Future-Based Operations

CallFuture provides a Future implementation that also acts as a Callback, enabling both blocking and non-blocking usage patterns.

public class CallFuture<T> implements Future<T>, Callback<T> {
    // Constructors
    public CallFuture();
    public CallFuture(Callback<T> chainedCallback);
    
    // Result access methods
    public T getResult() throws Exception;
    public Throwable getError();
    
    // Blocking wait methods
    public void await() throws InterruptedException;
    public void await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
    
    // Future interface methods
    public boolean cancel(boolean mayInterruptIfRunning);
    public boolean isCancelled();
    public boolean isDone();
    public T get() throws InterruptedException, ExecutionException;
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    
    // Callback interface methods
    public void handleResult(T result);
    public void handleError(Throwable error);
}

Usage Examples

// Basic Future usage
CallFuture<String> future = new CallFuture<>();
requestor.request("getData", request, future);

try {
    // Block until result is available
    String result = future.get();
    System.out.println("Result: " + result);
} catch (ExecutionException e) {
    System.err.println("RPC failed: " + e.getCause().getMessage());
}

// Future with timeout
CallFuture<ProcessingResult> future = new CallFuture<>();
requestor.request("longRunningTask", params, future);

try {
    ProcessingResult result = future.get(30, TimeUnit.SECONDS);
    System.out.println("Task completed: " + result.getStatus());
} catch (TimeoutException e) {
    System.err.println("Task timed out");
    future.cancel(true);
}

// Chained callback with Future
CallFuture<String> future = new CallFuture<>(new Callback<String>() {
    @Override
    public void handleResult(String result) {
        // This callback is invoked in addition to Future completion
        notifyListeners(result);
    }
    
    @Override
    public void handleError(Throwable error) {
        logError(error);
    }
});

requestor.request("getData", request, future);

// Can still use Future methods
if (future.isDone()) {
    String result = future.getResult(); // Non-blocking if done
}

Asynchronous Requestor Operations

All requestor implementations support asynchronous operations through the callback-based request method.

// From Requestor base class
public abstract class Requestor {
    // Asynchronous RPC call
    public <T> void request(String messageName, Object request, Callback<T> callback) throws Exception;
    
    // Synchronous RPC call (for comparison)
    public Object request(String messageName, Object request) throws Exception;
}

Usage Examples

// Generic requestor async usage
GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
GenericData.Record request = createRequest();

requestor.request("processData", request, new Callback<Object>() {
    @Override
    public void handleResult(Object result) {
        if (result instanceof GenericData.Record) {
            GenericData.Record record = (GenericData.Record) result;
            System.out.println("Status: " + record.get("status"));
        }
    }
    
    @Override
    public void handleError(Throwable error) {
        System.err.println("Generic RPC failed: " + error.getMessage());
    }
});

// Specific requestor async usage
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);

requestor.request("getUserData", userId, new Callback<UserData>() {
    @Override
    public void handleResult(UserData userData) {
        updateUserInterface(userData);
    }
    
    @Override
    public void handleError(Throwable error) {
        showErrorMessage("Failed to load user data: " + error.getMessage());
    }
});

Asynchronous Transport Operations

Transport implementations provide asynchronous communication at the transport layer.

// From Transceiver base class
public abstract class Transceiver {
    // Asynchronous transport operation
    public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
    
    // Synchronous transport operation (for comparison)
    public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
}

Usage Examples

// Direct transport-level async usage
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
List<ByteBuffer> requestBuffers = serializeRequest(request);

transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {
    @Override
    public void handleResult(List<ByteBuffer> responseBuffers) {
        Object response = deserializeResponse(responseBuffers);
        handleResponse(response);
    }
    
    @Override
    public void handleError(Throwable error) {
        System.err.println("Transport error: " + error.getMessage());
    }
});

Advanced Asynchronous Patterns

Concurrent RPC Calls

Execute multiple RPC calls concurrently and wait for all to complete:

public class ConcurrentRPCExample {
    public void fetchMultipleDataSources() {
        List<CallFuture<String>> futures = new ArrayList<>();
        
        // Start multiple async calls
        for (String dataSource : dataSources) {
            CallFuture<String> future = new CallFuture<>();
            requestor.request("fetchData", dataSource, future);
            futures.add(future);
        }
        
        // Wait for all to complete
        List<String> results = new ArrayList<>();
        for (CallFuture<String> future : futures) {
            try {
                results.add(future.get(10, TimeUnit.SECONDS));
            } catch (Exception e) {
                System.err.println("Failed to fetch data: " + e.getMessage());
            }
        }
        
        processResults(results);
    }
}

Callback Chaining

Chain multiple asynchronous operations:

public class CallbackChaining {
    public void processUserWorkflow(long userId) {
        // Step 1: Get user data
        requestor.request("getUser", userId, new Callback<UserData>() {
            @Override
            public void handleResult(UserData user) {
                // Step 2: Get user preferences
                requestor.request("getPreferences", user.getId(), new Callback<Preferences>() {
                    @Override
                    public void handleResult(Preferences prefs) {
                        // Step 3: Customize content
                        requestor.request("customizeContent", 
                            new CustomizationRequest(user, prefs), 
                            new Callback<Content>() {
                                @Override
                                public void handleResult(Content content) {
                                    displayContent(content);
                                }
                                
                                @Override
                                public void handleError(Throwable error) {
                                    showDefaultContent();
                                }
                            });
                    }
                    
                    @Override
                    public void handleError(Throwable error) {
                        // Use default preferences
                        showDefaultContent();
                    }
                });
            }
            
            @Override
            public void handleError(Throwable error) {
                showError("Failed to load user: " + error.getMessage());
            }
        });
    }
}

Custom Future Implementation

Create specialized Future implementations for complex scenarios:

public class TimeoutCallFuture<T> extends CallFuture<T> {
    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> timeoutTask;
    
    public TimeoutCallFuture(long timeout, TimeUnit unit) {
        super();
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.timeoutTask = scheduler.schedule(() -> {
            if (!isDone()) {
                handleError(new TimeoutException("RPC call timed out"));
            }
        }, timeout, unit);
    }
    
    @Override
    public void handleResult(T result) {
        timeoutTask.cancel(false);
        scheduler.shutdown();
        super.handleResult(result);
    }
    
    @Override
    public void handleError(Throwable error) {
        timeoutTask.cancel(false);
        scheduler.shutdown();
        super.handleError(error);
    }
}

// Usage
TimeoutCallFuture<String> future = new TimeoutCallFuture<>(5, TimeUnit.SECONDS);
requestor.request("slowOperation", request, future);

Error Recovery Patterns

Implement retry logic with exponential backoff:

public class RetryCallback<T> implements Callback<T> {
    private final Requestor requestor;
    private final String messageName;
    private final Object request;
    private final Callback<T> finalCallback;
    private final int maxRetries;
    private int currentTry = 0;
    
    public RetryCallback(Requestor requestor, String messageName, Object request,
                        Callback<T> finalCallback, int maxRetries) {
        this.requestor = requestor;
        this.messageName = messageName;
        this.request = request;
        this.finalCallback = finalCallback;
        this.maxRetries = maxRetries;
    }
    
    @Override
    public void handleResult(T result) {
        finalCallback.handleResult(result);
    }
    
    @Override
    public void handleError(Throwable error) {
        if (currentTry < maxRetries && isRetryable(error)) {
            currentTry++;
            long delay = Math.min(1000 * (1L << currentTry), 30000); // Exponential backoff
            
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            scheduler.schedule(() -> {
                try {
                    requestor.request(messageName, request, this);
                } catch (Exception e) {
                    finalCallback.handleError(e);
                }
                scheduler.shutdown();
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            finalCallback.handleError(error);
        }
    }
    
    private boolean isRetryable(Throwable error) {
        return error instanceof IOException || 
               error instanceof SocketTimeoutException ||
               error instanceof ConnectException;
    }
}

// Usage
Callback<String> retryCallback = new RetryCallback<>(requestor, "getData", request, 
    new Callback<String>() {
        @Override
        public void handleResult(String result) {
            System.out.println("Success: " + result);
        }
        
        @Override
        public void handleError(Throwable error) {
            System.err.println("Final failure: " + error.getMessage());
        }
    }, 3); // Max 3 retries

requestor.request("getData", request, retryCallback);

Thread Safety and Concurrency

Thread Safety Guarantees

  • CallFuture instances are thread-safe for concurrent access
  • Callback methods are invoked on I/O threads - avoid blocking operations
  • Multiple concurrent async calls can be made with the same Requestor instance
  • Transport implementations handle concurrent async operations safely

Best Practices

// Good: Non-blocking callback processing
requestor.request("getData", request, new Callback<String>() {
    @Override
    public void handleResult(String result) {
        // Quick processing on callback thread
        resultQueue.offer(result);
        resultProcessor.signal(); // Wake up processing thread
    }
    
    @Override
    public void handleError(Throwable error) {
        errorLogger.logAsync(error); // Non-blocking logging
    }
});

// Bad: Blocking operations in callback
requestor.request("getData", request, new Callback<String>() {
    @Override
    public void handleResult(String result) {
        // This blocks the I/O thread!
        database.saveResult(result); // Blocking database call
        Thread.sleep(1000); // Very bad!
    }
    
    @Override
    public void handleError(Throwable error) {
        // Also bad - blocking I/O
        System.out.println("Error: " + error.getMessage());
    }
});

Performance Considerations

Async vs Sync Performance

  • Asynchronous calls provide better throughput for concurrent operations
  • Reduced thread usage compared to synchronous calls with thread pools
  • Lower memory overhead per outstanding request
  • Better resource utilization in high-concurrency scenarios

Optimization Tips

// Reuse CallFuture instances when possible
private final Queue<CallFuture<String>> futurePool = new ConcurrentLinkedQueue<>();

public CallFuture<String> getFuture() {
    CallFuture<String> future = futurePool.poll();
    return future != null ? future : new CallFuture<>();
}

public void returnFuture(CallFuture<String> future) {
    if (future.isDone()) {
        future.reset(); // Hypothetical reset method
        futurePool.offer(future);
    }
}

// Use appropriate timeout values
CallFuture<String> future = new CallFuture<>();
requestor.request("quickOperation", request, future);
String result = future.get(100, TimeUnit.MILLISECONDS); // Short timeout for quick ops

// Batch related operations
List<CallFuture<String>> batch = new ArrayList<>();
for (String item : items) {
    CallFuture<String> future = new CallFuture<>();
    requestor.request("processItem", item, future);
    batch.add(future);
}
// Process results in batch

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-ipc

docs

async.md

core-rpc.md

index.md

plugins.md

protocols.md

stats.md

transports.md

tile.json