Apache Thrift Java Library - A lightweight, language-independent software stack for point-to-point RPC implementation providing clean abstractions and implementations for data transport, data serialization, and application level processing
—
Non-blocking client and server implementations for high-performance applications. The async framework enables handling many concurrent operations without blocking threads, making it ideal for high-throughput scenarios.
Base classes and interfaces for asynchronous client implementations.
/**
* Abstract base class for asynchronous client implementations
*/
public abstract class TAsyncClient {
/** Create async client with protocol factory, manager, and transport */
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport);
/** Create async client with timeout */
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout);
/** Get the protocol factory used by this client */
public TProtocolFactory getProtocolFactory();
/** Get the current timeout value in milliseconds */
public long getTimeout();
/** Set timeout for operations in milliseconds */
public void setTimeout(long timeout);
/** Check if client has an error */
public boolean hasError();
/** Get the current error (if any) */
public Exception getError();
/** Check if last operation timed out */
public boolean hasTimeout();
/** Called when async operation completes successfully */
protected void onComplete();
/** Called when async operation encounters an error */
protected void onError(Exception exception);
}Manager for handling asynchronous client operations and selector threads.
/**
* Manages asynchronous client connections and operations
*/
public class TAsyncClientManager {
/** Create client manager with default settings */
public TAsyncClientManager() throws IOException;
/** Create client manager with custom configuration */
public TAsyncClientManager(int selectThreadCount, int selectorThreadPoolSize, long timeoutCheckInterval) throws IOException;
/** Execute an asynchronous method call */
public void call(TAsyncMethodCall method) throws TException;
/** Stop the client manager and all selector threads */
public void stop();
/** Check if client manager is stopped */
public boolean isStopped();
}Classes representing asynchronous method invocations.
/**
* Abstract base class representing an asynchronous method call
*/
public abstract class TAsyncMethodCall<T> {
/** Start the async method call */
public void start(Selector sel) throws IOException;
/** Check if the method call has finished */
public boolean isFinished();
/** Get the result of the method call (blocks until complete) */
public T getResult() throws Exception;
/** Check if method call has an error */
public boolean hasError();
/** Get the error (if any) */
public Exception getError();
/** Check if method call timed out */
public boolean hasTimeout();
/** Get the client that made this call */
public TAsyncClient getClient();
/** Get the transport for this call */
protected TNonblockingTransport getTransport();
/** Called when operation completes successfully */
protected abstract void onComplete();
/** Called when operation encounters an error */
protected abstract void onError(Exception exception);
/** Clean up resources and fire callback */
protected void cleanUpAndFireCallback(SelectionKey key);
}Callback interfaces for handling asynchronous operation results.
/**
* Callback interface for asynchronous method calls
* @param <T> The type of the result
*/
public interface AsyncMethodCallback<T> {
/** Called when the asynchronous operation completes successfully */
public void onComplete(T response);
/** Called when the asynchronous operation encounters an error */
public void onError(Exception exception);
}Usage Examples:
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.protocol.TBinaryProtocol;
// Create async client manager
TAsyncClientManager clientManager = new TAsyncClientManager();
// Create non-blocking transport
TNonblockingSocket transport = new TNonblockingSocket("localhost", 9090);
// Create async client
MyService.AsyncClient asyncClient = new MyService.AsyncClient(
new TBinaryProtocol.Factory(),
clientManager,
transport
);
// Define callback for async method
AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
public void onComplete(String response) {
System.out.println("Received response: " + response);
}
public void onError(Exception exception) {
System.err.println("Error occurred: " + exception.getMessage());
}
};
// Make async method call
asyncClient.myAsyncMethod("parameter", callback);
// Continue with other work while call executes...
// Cleanup when done
clientManager.stop();Base classes for asynchronous server-side processing.
/**
* Interface for asynchronous request processing
*/
public interface TAsyncProcessor {
/** Process an asynchronous request */
public void process(AsyncProcessFunction<?, ?, ?> function, TProtocol iproto, TProtocol oproto, AsyncMethodCallback callback) throws TException;
}
/**
* Base implementation for asynchronous processors
*/
public class TBaseAsyncProcessor implements TAsyncProcessor {
/** Create base async processor */
public TBaseAsyncProcessor();
/** Process async request using registered process functions */
public void process(AsyncProcessFunction<?, ?, ?> function, TProtocol iproto, TProtocol oproto, AsyncMethodCallback callback) throws TException;
/** Register an async process function for a method name */
protected void registerProcessor(String methodName, AsyncProcessFunction<?, ?, ?> fn);
/** Get registered process function by name */
protected AsyncProcessFunction<?, ?, ?> getProcessFunction(String methodName);
}Base classes for implementing asynchronous service methods.
/**
* Abstract base class for asynchronous processing functions
* @param <I> The service interface type
* @param <T> The arguments type
* @param <R> The result type
*/
public abstract class AsyncProcessFunction<I, T extends TBase, R> {
/** Create async process function with method name */
public AsyncProcessFunction(String methodName);
/** Get the method name */
public String getMethodName();
/** Start processing the async request */
public abstract void start(I iface, long seqid, TProtocol iprot, TProtocol oprot, AsyncMethodCallback resultHandler) throws TException;
/** Check if method is oneway (no response expected) */
public abstract boolean isOneway();
/** Create and read the arguments from protocol */
protected abstract T getEmptyArgsInstance();
/** Create and read the result from protocol */
protected abstract TBase getEmptyResultInstance();
}Usage Examples for Async Server:
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.AsyncProcessFunction;
import org.apache.thrift.protocol.TProtocol;
// Example async service implementation
public class MyAsyncServiceHandler implements MyService.AsyncIface {
public void myAsyncMethod(String param, AsyncMethodCallback<String> callback) {
// Simulate async processing (e.g., database call, external service)
CompletableFuture.supplyAsync(() -> {
try {
// Do async work...
Thread.sleep(100); // Simulate work
return "Processed: " + param;
} catch (Exception e) {
throw new RuntimeException(e);
}
}).whenComplete((result, exception) -> {
if (exception != null) {
callback.onError(exception);
} else {
callback.onComplete(result);
}
});
}
}
// Custom async process function
public class MyAsyncProcessFunction extends AsyncProcessFunction<MyService.AsyncIface, MyMethod_args, String> {
public MyAsyncProcessFunction() {
super("myAsyncMethod");
}
public void start(MyService.AsyncIface iface, long seqid, TProtocol iprot, TProtocol oprot, AsyncMethodCallback<String> resultHandler) throws TException {
MyMethod_args args = new MyMethod_args();
args.read(iprot);
iprot.readMessageEnd();
// Create callback that writes response
AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
public void onComplete(String response) {
try {
MyMethod_result result = new MyMethod_result();
result.success = response;
oprot.writeMessageBegin(new TMessage("myAsyncMethod", TMessageType.REPLY, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
resultHandler.onComplete(response);
} catch (Exception e) {
onError(e);
}
}
public void onError(Exception exception) {
try {
TApplicationException appEx = new TApplicationException(TApplicationException.INTERNAL_ERROR, exception.getMessage());
oprot.writeMessageBegin(new TMessage("myAsyncMethod", TMessageType.EXCEPTION, seqid));
appEx.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
} catch (Exception e) {
// Log error
}
resultHandler.onError(exception);
}
};
// Call async method
iface.myAsyncMethod(args.param, callback);
}
public boolean isOneway() {
return false;
}
protected MyMethod_args getEmptyArgsInstance() {
return new MyMethod_args();
}
protected MyMethod_result getEmptyResultInstance() {
return new MyMethod_result();
}
}import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.protocol.TBinaryProtocol;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncClientPool {
private final TAsyncClientManager clientManager;
private final String host;
private final int port;
public AsyncClientPool(String host, int port) throws IOException {
this.clientManager = new TAsyncClientManager();
this.host = host;
this.port = port;
}
public void executeMultipleAsync(int numCalls) throws Exception {
CountDownLatch latch = new CountDownLatch(numCalls);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);
for (int i = 0; i < numCalls; i++) {
// Create new transport for each call
TNonblockingSocket transport = new TNonblockingSocket(host, port);
// Create async client
MyService.AsyncClient client = new MyService.AsyncClient(
new TBinaryProtocol.Factory(),
clientManager,
transport
);
// Create callback
final int callId = i;
AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
public void onComplete(String response) {
System.out.println("Call " + callId + " completed: " + response);
successCount.incrementAndGet();
latch.countDown();
}
public void onError(Exception exception) {
System.err.println("Call " + callId + " failed: " + exception.getMessage());
errorCount.incrementAndGet();
latch.countDown();
}
};
// Make async call
client.myAsyncMethod("Request " + i, callback);
}
// Wait for all calls to complete
latch.await();
System.out.println("Completed: " + successCount.get() + " successful, " + errorCount.get() + " errors");
}
public void shutdown() {
clientManager.stop();
}
}import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.protocol.TBinaryProtocol;
public class AsyncServerExample {
public static void main(String[] args) throws Exception {
// Create async processor
MyService.AsyncProcessor<MyAsyncServiceHandler> processor =
new MyService.AsyncProcessor<>(new MyAsyncServiceHandler());
// Create non-blocking server transport
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
// Create non-blocking server
TNonblockingServer server = new TNonblockingServer(
new TNonblockingServer.Args(serverTransport)
.processor(processor)
.protocolFactory(new TBinaryProtocol.Factory())
);
System.out.println("Starting async server on port 9090...");
server.serve();
}
}import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.TException;
// Robust async callback with error handling
public abstract class RobustAsyncCallback<T> implements AsyncMethodCallback<T> {
private final int maxRetries;
private int retryCount = 0;
public RobustAsyncCallback(int maxRetries) {
this.maxRetries = maxRetries;
}
public final void onError(Exception exception) {
if (shouldRetry(exception) && retryCount < maxRetries) {
retryCount++;
System.out.println("Retrying operation (attempt " + retryCount + "/" + maxRetries + ")");
retry();
} else {
handleFinalError(exception);
}
}
protected boolean shouldRetry(Exception exception) {
// Retry on timeout or connection errors, but not on application errors
return !(exception instanceof TApplicationException) && retryCount < maxRetries;
}
protected abstract void retry();
protected abstract void handleFinalError(Exception exception);
}
// Usage example
RobustAsyncCallback<String> robustCallback = new RobustAsyncCallback<String>(3) {
@Override
public void onComplete(String response) {
System.out.println("Success: " + response);
}
@Override
protected void retry() {
// Re-execute the async method call
client.myAsyncMethod("parameter", this);
}
@Override
protected void handleFinalError(Exception exception) {
System.err.println("Final failure after retries: " + exception.getMessage());
}
};Install with Tessl CLI
npx tessl i tessl/maven-org-apache-thrift--libthrift