Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations
—
Apache Avro IPC provides multiple transport implementations to support different networking requirements, from simple HTTP communication to secure socket connections and in-process communication.
HTTP-based transport using standard HTTP POST requests with binary Avro data. Suitable for web environments and when working through firewalls and proxies.
public class HttpTransceiver extends Transceiver {
public static final String CONTENT_TYPE = "avro/binary";
// Constructors
public HttpTransceiver(URL url) throws IOException;
public HttpTransceiver(URL url, Proxy proxy) throws IOException;
// Configuration
public void setTimeout(int timeout);
// Inherited from Transceiver
public String getRemoteName() throws IOException;
public List<ByteBuffer> readBuffers() throws IOException;
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public void close() throws IOException;
// Static utility methods
public static int getLength(List<ByteBuffer> buffers);
public static List<ByteBuffer> readBuffers(InputStream in) throws IOException;
public static void writeBuffers(List<ByteBuffer> buffers, OutputStream out) throws IOException;
}public class ResponderServlet extends HttpServlet {
// Constructor
public ResponderServlet(Responder responder);
// HTTP handling
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException;
}// HTTP client setup
URL serverUrl = new URL("http://localhost:8080/rpc");
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
transceiver.setTimeout(30000); // 30 second timeout
// With proxy
Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxy.company.com", 8080));
HttpTransceiver transceiver = new HttpTransceiver(serverUrl, proxy);
// HTTP server setup (servlet deployment)
MyServiceImpl implementation = new MyServiceImpl();
SpecificResponder responder = new SpecificResponder(MyService.class, implementation);
ResponderServlet servlet = new ResponderServlet(responder);
// Deploy to servlet container (example with Jetty)
ServletContextHandler context = new ServletContextHandler();
context.addServlet(new ServletHolder(servlet), "/rpc");Socket-based transport implementations providing persistent connections with optional SASL authentication and encryption.
public class SaslSocketTransceiver extends Transceiver {
// Constructors
public SaslSocketTransceiver(SocketAddress address) throws IOException; // Anonymous SASL
public SaslSocketTransceiver(SocketAddress address, SaslClient saslClient) throws IOException;
public SaslSocketTransceiver(SocketChannel channel, SaslServer saslServer) throws IOException;
// Inherited from Transceiver
public String getRemoteName() throws IOException;
public List<ByteBuffer> readBuffers() throws IOException;
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public boolean isConnected();
public void close() throws IOException;
}@Deprecated
public class SocketTransceiver extends Transceiver {
// Constructors
public SocketTransceiver(SocketAddress address) throws IOException;
public SocketTransceiver(SocketChannel channel) throws IOException;
// Inherited from Transceiver
public String getRemoteName() throws IOException;
public List<ByteBuffer> readBuffers() throws IOException;
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public boolean isConnected();
public void close() throws IOException;
}public class SaslSocketServer extends SocketServer {
// Constructors
public SaslSocketServer(Responder responder, SocketAddress addr) throws IOException; // Anonymous SASL
public SaslSocketServer(Responder responder, SocketAddress addr, String mechanism,
String protocol, String serverName, Map<String,?> props, CallbackHandler cbh) throws IOException;
// Inherited from SocketServer and Server interface
public int getPort();
public void start();
public void close();
public void join() throws InterruptedException;
public void run(); // From Thread
}@Deprecated
public class SocketServer extends Thread implements Server {
// Constructor
public SocketServer(Responder responder, SocketAddress addr) throws IOException;
// Server interface
public int getPort();
public void start();
public void close();
public void join() throws InterruptedException;
// Thread methods
public void run();
// Protected methods for customization
protected Transceiver getTransceiver(SocketChannel channel) throws IOException;
}// SASL Socket client (Anonymous authentication)
InetSocketAddress serverAddress = new InetSocketAddress("localhost", 65001);
SaslSocketTransceiver transceiver = new SaslSocketTransceiver(serverAddress);
// SASL Socket client with custom authentication
Map<String, String> props = new HashMap<>();
props.put(Sasl.QOP, "auth-conf"); // Authentication and confidentiality
SaslClient saslClient = Sasl.createSaslClient(
new String[]{"DIGEST-MD5"}, "client", "avro", "server.example.com", props,
new MyCallbackHandler());
SaslSocketTransceiver transceiver = new SaslSocketTransceiver(serverAddress, saslClient);
// SASL Socket server
SpecificResponder responder = new SpecificResponder(MyService.class, implementation);
SaslSocketServer server = new SaslSocketServer(responder,
new InetSocketAddress(65001));
server.start();
// SASL Socket server with custom authentication
SaslSocketServer server = new SaslSocketServer(responder,
new InetSocketAddress(65001), "DIGEST-MD5", "avro", "server.example.com",
props, new MyServerCallbackHandler());
server.start();UDP datagram-based transport for connectionless communication. Note that this uses a non-standard wire protocol.
public class DatagramTransceiver extends Transceiver {
// Constructors
public DatagramTransceiver(SocketAddress remote) throws IOException;
public DatagramTransceiver(DatagramChannel channel) throws IOException;
// Inherited from Transceiver
public String getRemoteName() throws IOException;
public List<ByteBuffer> readBuffers() throws IOException;
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public void close() throws IOException;
}public class DatagramServer extends Thread implements Server {
// Constructor
public DatagramServer(Responder responder, SocketAddress addr) throws IOException;
// Server interface
public int getPort();
public void start();
public void close();
public void join() throws InterruptedException;
// Thread methods
public void run();
}// Datagram client
DatagramTransceiver transceiver = new DatagramTransceiver(
new InetSocketAddress("localhost", 65002));
// Datagram server
GenericResponder responder = new MyGenericResponder(protocol);
DatagramServer server = new DatagramServer(responder,
new InetSocketAddress(65002));
server.start();In-process transport for same-JVM communication, eliminating network overhead.
public class LocalTransceiver extends Transceiver {
// Constructor
public LocalTransceiver(Responder responder);
// Inherited from Transceiver
public String getRemoteName();
public List<ByteBuffer> readBuffers() throws IOException;
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
public void close() throws IOException;
}// Local in-process communication
MyServiceImpl implementation = new MyServiceImpl();
SpecificResponder responder = new SpecificResponder(MyService.class, implementation);
LocalTransceiver transceiver = new LocalTransceiver(responder);
// Use with any requestor
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);
MyService client = SpecificRequestor.getClient(MyService.class, transceiver);public class Ipc {
// Factory methods
public static Transceiver createTransceiver(URI uri) throws IOException;
public static Server createServer(Responder responder, URI uri) throws IOException;
}// Create transceiver from URI
Transceiver transceiver = Ipc.createTransceiver(URI.create("http://localhost:8080/rpc"));
Transceiver transceiver = Ipc.createTransceiver(URI.create("sasl://localhost:65001"));
// Create server from URI
Server server = Ipc.createServer(responder, URI.create("sasl://localhost:65001"));
server.start();HTTP transport supports connection pooling through the underlying HTTP client:
// System properties for HTTP client configuration
System.setProperty("http.maxConnections", "10");
System.setProperty("http.keepAlive", "true");For HTTPS transport:
// Configure SSL context
System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
URL httpsUrl = new URL("https://secure.example.com:8443/rpc");
HttpTransceiver transceiver = new HttpTransceiver(httpsUrl);For socket transports, configure socket options:
// Example custom socket server
public class CustomSocketServer extends SaslSocketServer {
@Override
protected Transceiver getTransceiver(SocketChannel channel) throws IOException {
// Configure socket options
channel.socket().setTcpNoDelay(true);
channel.socket().setReceiveBufferSize(65536);
channel.socket().setSendBufferSize(65536);
return super.getTransceiver(channel);
}
}Transport-level errors are typically IOException or its subclasses:
try {
Object result = requestor.request("method", request);
} catch (ConnectException e) {
// Server not available
System.err.println("Cannot connect to server: " + e.getMessage());
} catch (SocketTimeoutException e) {
// Request timeout
System.err.println("Request timed out: " + e.getMessage());
} catch (IOException e) {
// General network error
System.err.println("Network error: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-ipc