Netty-based implementation for Apache Avro's inter-process communication (IPC) system providing high-performance, asynchronous network communication.
npx @tessl/cli install tessl/maven-org-apache-avro--avro-ipc-netty@1.12.0Apache Avro IPC Netty provides a Netty-based implementation for Apache Avro's inter-process communication (IPC) system. It enables high-performance, asynchronous network communication between Avro applications using the Netty framework, including advanced features like SSL/TLS encryption, compression, concurrent request handling, and automatic connection management.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc-netty</artifactId>
<version>1.12.0</version>
</dependency>import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.netty.NettyTransportCodec.NettyDataPack;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.Callback;
import java.net.InetSocketAddress;
import java.util.function.Consumer;
import java.util.concurrent.ThreadFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ChannelFutureListener;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.bootstrap.Bootstrap;import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.specific.SpecificResponder;
import java.net.InetSocketAddress;
// Create a responder for your protocol
MyProtocol impl = new MyProtocolImpl();
Responder responder = new SpecificResponder(MyProtocol.class, impl);
// Create and start the server
NettyServer server = new NettyServer(responder, new InetSocketAddress("localhost", 8080));
server.start();
// Server is now listening for connections
System.out.println("Server listening on port: " + server.getPort());
// Clean shutdown
server.close();import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import java.net.InetSocketAddress;
// Create transceiver to connect to server
NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 8080));
// Create client proxy
MyProtocol client = SpecificRequestor.getClient(MyProtocol.class, transceiver);
// Make RPC calls
String result = client.myMethod("parameter");
// Clean shutdown
transceiver.close();NettyServer provides a complete Netty-based server implementation for hosting Avro RPC services.
public class NettyServer implements Server {
/**
* Creates a new Netty-based Avro RPC server.
* @param responder - The responder handling incoming RPC requests
* @param addr - The socket address to bind the server to
* @throws InterruptedException if the server binding is interrupted
*/
public NettyServer(Responder responder, InetSocketAddress addr) throws InterruptedException;
/**
* Creates a new Netty-based Avro RPC server with channel customization.
* @param responder - The responder handling incoming RPC requests
* @param addr - The socket address to bind the server to
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
* @throws InterruptedException if the server binding is interrupted
*/
public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer) throws InterruptedException;
/**
* Creates a new Netty-based Avro RPC server with channel and bootstrap customization.
* @param responder - The responder handling incoming RPC requests
* @param addr - The socket address to bind the server to
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
* @param bootStrapInitialzier - Custom server bootstrap configuration
* @throws InterruptedException if the server binding is interrupted
*/
public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer, final Consumer<ServerBootstrap> bootStrapInitialzier) throws InterruptedException;
/**
* Creates a new Netty-based Avro RPC server with full customization.
* @param responder - The responder handling incoming RPC requests
* @param addr - The socket address to bind the server to
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
* @param bootStrapInitialzier - Custom server bootstrap configuration
* @param bossGroup - EventLoopGroup for accepting connections
* @param workerGroup - EventLoopGroup for handling client connections
* @param callerGroup - EventLoopGroup for processing RPC calls
* @throws InterruptedException if the server binding is interrupted
*/
public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer, final Consumer<ServerBootstrap> bootStrapInitialzier, EventLoopGroup bossGroup, EventLoopGroup workerGroup, EventLoopGroup callerGroup) throws InterruptedException;
// Server lifecycle
public void start();
public void close();
public void join() throws InterruptedException;
// Server information
public int getPort();
public int getNumActiveConnections();
}Usage Examples:
Basic server:
NettyServer server = new NettyServer(responder, new InetSocketAddress(8080));Server with SSL configuration:
Consumer<SocketChannel> sslInitializer = channel -> {
SslHandler sslHandler = sslContext.newHandler(channel.alloc());
channel.pipeline().addFirst("ssl", sslHandler);
};
NettyServer server = new NettyServer(responder, new InetSocketAddress(8080), sslInitializer);Server with custom event loop groups:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(10);
NettyServer server = new NettyServer(responder, addr, null, null, bossGroup, workerGroup, null);NettyTransceiver provides a Netty-based client implementation for connecting to Avro RPC services.
public class NettyTransceiver extends Transceiver {
// Constants
public static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000;
public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis";
public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
public static final String NETTY_KEEPALIVE_OPTION = "keepAlive";
public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
/**
* Creates a new Netty transceiver with default settings.
* @param addr - The server address to connect to
* @throws IOException if connection fails
*/
public NettyTransceiver(InetSocketAddress addr) throws IOException;
/**
* Creates a new Netty transceiver with custom connection timeout.
* @param addr - The server address to connect to
* @param connectTimeoutMillis - Connection timeout in milliseconds (null for default)
* @throws IOException if connection fails
*/
public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis) throws IOException;
/**
* Creates a new Netty transceiver with channel customization.
* @param addr - The server address to connect to
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
* @throws IOException if connection fails
*/
public NettyTransceiver(InetSocketAddress addr, final Consumer<SocketChannel> initializer) throws IOException;
/**
* Creates a new Netty transceiver with timeout and channel customization.
* @param addr - The server address to connect to
* @param connectTimeoutMillis - Connection timeout in milliseconds (null for default)
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
* @throws IOException if connection fails
*/
public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis, final Consumer<SocketChannel> initializer) throws IOException;
/**
* Creates a new Netty transceiver with full customization.
* @param addr - The server address to connect to
* @param connectTimeoutMillis - Connection timeout in milliseconds (null for default)
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
* @param bootStrapInitialzier - Custom bootstrap configuration
* @throws IOException if connection fails
*/
public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis, final Consumer<SocketChannel> initializer, final Consumer<Bootstrap> bootStrapInitialzier) throws IOException;
// Connection management
public void close();
public void close(boolean awaitCompletion);
public String getRemoteName() throws IOException;
public boolean isConnected();
// Communication
public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public List<ByteBuffer> readBuffers() throws IOException; // Throws UnsupportedOperationException
// Protocol management
public Protocol getRemote();
public void setRemote(Protocol protocol);
// Thread safety (no-ops - Netty channels are thread-safe)
public void lockChannel();
public void unlockChannel();
// Inner classes for advanced usage
public static class WriteFutureListener implements ChannelFutureListener {
public WriteFutureListener(Callback<List<ByteBuffer>> callback);
public void operationComplete(ChannelFuture future) throws Exception;
}
public static class NettyTransceiverThreadFactory implements ThreadFactory {
public NettyTransceiverThreadFactory(String prefix);
public Thread newThread(Runnable r);
}
}Usage Examples:
Basic client connection:
NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress("server.example.com", 8080));Client with custom timeout:
NettyTransceiver transceiver = new NettyTransceiver(
new InetSocketAddress("server.example.com", 8080),
30000 // 30 second timeout
);Client with SSL configuration:
Consumer<SocketChannel> sslInitializer = channel -> {
SslHandler sslHandler = sslContext.newHandler(channel.alloc(), "server.example.com", 8080);
channel.pipeline().addFirst("ssl", sslHandler);
};
NettyTransceiver transceiver = new NettyTransceiver(
new InetSocketAddress("server.example.com", 8080),
sslInitializer
);Asynchronous transceive:
List<ByteBuffer> request = // ... prepare request
Callback<List<ByteBuffer>> callback = new Callback<List<ByteBuffer>>() {
@Override
public void handleResult(List<ByteBuffer> result) {
// Handle successful response
}
@Override
public void handleError(Throwable error) {
// Handle error
}
};
transceiver.transceive(request, callback);Custom thread factory for Netty event loops:
NettyTransceiver.NettyTransceiverThreadFactory threadFactory =
new NettyTransceiver.NettyTransceiverThreadFactory("avro-client-");
EventLoopGroup workerGroup = new NioEventLoopGroup(4, threadFactory);NettyTransportCodec provides the data structures and encoding/decoding functionality for the Netty transport protocol.
public class NettyTransportCodec {
// Data structure for transport protocol
public static class NettyDataPack {
public NettyDataPack();
public NettyDataPack(int serial, List<ByteBuffer> datas);
public void setSerial(int serial);
public int getSerial();
public void setDatas(List<ByteBuffer> datas);
public List<ByteBuffer> getDatas();
}
// Frame encoder for outgoing messages
public static class NettyFrameEncoder extends MessageToMessageEncoder<NettyDataPack> {
protected void encode(ChannelHandlerContext ctx, NettyDataPack dataPack, List<Object> out) throws Exception;
}
// Frame decoder for incoming messages
public static class NettyFrameDecoder extends ByteToMessageDecoder {
public NettyFrameDecoder();
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
}Usage Examples:
The transport codec is typically used internally by NettyServer and NettyTransceiver, but can be used directly for custom implementations:
// Create a data pack
List<ByteBuffer> data = Arrays.asList(ByteBuffer.wrap("Hello".getBytes()));
NettyDataPack dataPack = new NettyDataPack(123, data);
// The codec classes are typically added to the Netty pipeline automatically
channel.pipeline()
.addLast("frameDecoder", new NettyFrameDecoder())
.addLast("frameEncoder", new NettyFrameEncoder())
.addLast("handler", customHandler);// From org.apache.avro.ipc package
public interface Server {
void start();
void close();
int getPort();
void join() throws InterruptedException;
}
public abstract class Transceiver implements Closeable {
public abstract String getRemoteName() throws IOException;
public abstract List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
public abstract void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
public abstract List<ByteBuffer> readBuffers() throws IOException;
public abstract void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public abstract Protocol getRemote();
public abstract void setRemote(Protocol protocol);
public abstract boolean isConnected();
public abstract void lockChannel();
public abstract void unlockChannel();
}
public abstract class Responder {
public abstract List<ByteBuffer> respond(List<ByteBuffer> request, Transceiver connection) throws IOException;
}
public interface Callback<T> {
void handleResult(T result);
void handleError(Throwable error);
}
// From Netty
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter;
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter;
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter;
public interface ChannelFutureListener extends EventListener;
public interface ChannelFuture extends Future<Void>;
// Java standard types
public interface Consumer<T> {
void accept(T t);
}
public interface ThreadFactory {
Thread newThread(Runnable r);
}The package throws standard Java IO exceptions and Avro-specific exceptions:
Common error handling patterns:
try {
NettyTransceiver transceiver = new NettyTransceiver(address);
// Use transceiver...
} catch (IOException e) {
// Handle connection or communication errors
log.error("Failed to connect to server", e);
} finally {
if (transceiver != null) {
transceiver.close();
}
}For asynchronous operations, handle errors in the callback:
transceiver.transceive(request, new Callback<List<ByteBuffer>>() {
@Override
public void handleResult(List<ByteBuffer> result) {
// Process successful response
}
@Override
public void handleError(Throwable error) {
if (error instanceof IOException) {
// Handle communication error
} else if (error instanceof AvroRuntimeException) {
// Handle protocol error
}
}
});The Apache Avro IPC Netty package follows a layered architecture:
Key design patterns: