0
# Apache Avro IPC Netty
1
2
Apache Avro IPC Netty provides a Netty-based implementation for Apache Avro's inter-process communication (IPC) system. It enables high-performance, asynchronous network communication between Avro applications using the Netty framework, including advanced features like SSL/TLS encryption, compression, concurrent request handling, and automatic connection management.
3
4
## Package Information
5
6
- **Package Name**: avro-ipc-netty
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.avro</groupId>
14
<artifactId>avro-ipc-netty</artifactId>
15
<version>1.12.0</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.avro.ipc.netty.NettyServer;
23
import org.apache.avro.ipc.netty.NettyTransceiver;
24
import org.apache.avro.ipc.netty.NettyTransportCodec.NettyDataPack;
25
import org.apache.avro.ipc.Responder;
26
import org.apache.avro.ipc.specific.SpecificResponder;
27
import org.apache.avro.ipc.specific.SpecificRequestor;
28
import org.apache.avro.ipc.Callback;
29
import java.net.InetSocketAddress;
30
import java.util.function.Consumer;
31
import java.util.concurrent.ThreadFactory;
32
import io.netty.channel.socket.SocketChannel;
33
import io.netty.channel.ChannelFutureListener;
34
import io.netty.bootstrap.ServerBootstrap;
35
import io.netty.bootstrap.Bootstrap;
36
```
37
38
## Basic Usage
39
40
### Server Setup
41
42
```java
43
import org.apache.avro.ipc.netty.NettyServer;
44
import org.apache.avro.ipc.specific.SpecificResponder;
45
import java.net.InetSocketAddress;
46
47
// Create a responder for your protocol
48
MyProtocol impl = new MyProtocolImpl();
49
Responder responder = new SpecificResponder(MyProtocol.class, impl);
50
51
// Create and start the server
52
NettyServer server = new NettyServer(responder, new InetSocketAddress("localhost", 8080));
53
server.start();
54
55
// Server is now listening for connections
56
System.out.println("Server listening on port: " + server.getPort());
57
58
// Clean shutdown
59
server.close();
60
```
61
62
### Client Setup
63
64
```java
65
import org.apache.avro.ipc.netty.NettyTransceiver;
66
import org.apache.avro.ipc.specific.SpecificRequestor;
67
import java.net.InetSocketAddress;
68
69
// Create transceiver to connect to server
70
NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 8080));
71
72
// Create client proxy
73
MyProtocol client = SpecificRequestor.getClient(MyProtocol.class, transceiver);
74
75
// Make RPC calls
76
String result = client.myMethod("parameter");
77
78
// Clean shutdown
79
transceiver.close();
80
```
81
82
## Capabilities
83
84
### Server Management
85
86
NettyServer provides a complete Netty-based server implementation for hosting Avro RPC services.
87
88
```java { .api }
89
public class NettyServer implements Server {
90
/**
91
* Creates a new Netty-based Avro RPC server.
92
* @param responder - The responder handling incoming RPC requests
93
* @param addr - The socket address to bind the server to
94
* @throws InterruptedException if the server binding is interrupted
95
*/
96
public NettyServer(Responder responder, InetSocketAddress addr) throws InterruptedException;
97
98
/**
99
* Creates a new Netty-based Avro RPC server with channel customization.
100
* @param responder - The responder handling incoming RPC requests
101
* @param addr - The socket address to bind the server to
102
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
103
* @throws InterruptedException if the server binding is interrupted
104
*/
105
public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer) throws InterruptedException;
106
107
/**
108
* Creates a new Netty-based Avro RPC server with channel and bootstrap customization.
109
* @param responder - The responder handling incoming RPC requests
110
* @param addr - The socket address to bind the server to
111
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
112
* @param bootStrapInitialzier - Custom server bootstrap configuration
113
* @throws InterruptedException if the server binding is interrupted
114
*/
115
public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer, final Consumer<ServerBootstrap> bootStrapInitialzier) throws InterruptedException;
116
117
/**
118
* Creates a new Netty-based Avro RPC server with full customization.
119
* @param responder - The responder handling incoming RPC requests
120
* @param addr - The socket address to bind the server to
121
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
122
* @param bootStrapInitialzier - Custom server bootstrap configuration
123
* @param bossGroup - EventLoopGroup for accepting connections
124
* @param workerGroup - EventLoopGroup for handling client connections
125
* @param callerGroup - EventLoopGroup for processing RPC calls
126
* @throws InterruptedException if the server binding is interrupted
127
*/
128
public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer, final Consumer<ServerBootstrap> bootStrapInitialzier, EventLoopGroup bossGroup, EventLoopGroup workerGroup, EventLoopGroup callerGroup) throws InterruptedException;
129
130
// Server lifecycle
131
public void start();
132
public void close();
133
public void join() throws InterruptedException;
134
135
// Server information
136
public int getPort();
137
public int getNumActiveConnections();
138
}
139
```
140
141
**Usage Examples:**
142
143
Basic server:
144
```java
145
NettyServer server = new NettyServer(responder, new InetSocketAddress(8080));
146
```
147
148
Server with SSL configuration:
149
```java
150
Consumer<SocketChannel> sslInitializer = channel -> {
151
SslHandler sslHandler = sslContext.newHandler(channel.alloc());
152
channel.pipeline().addFirst("ssl", sslHandler);
153
};
154
NettyServer server = new NettyServer(responder, new InetSocketAddress(8080), sslInitializer);
155
```
156
157
Server with custom event loop groups:
158
```java
159
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
160
EventLoopGroup workerGroup = new NioEventLoopGroup(10);
161
NettyServer server = new NettyServer(responder, addr, null, null, bossGroup, workerGroup, null);
162
```
163
164
### Client Communication
165
166
NettyTransceiver provides a Netty-based client implementation for connecting to Avro RPC services.
167
168
```java { .api }
169
public class NettyTransceiver extends Transceiver {
170
// Constants
171
public static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000;
172
public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis";
173
public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
174
public static final String NETTY_KEEPALIVE_OPTION = "keepAlive";
175
public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
176
177
/**
178
* Creates a new Netty transceiver with default settings.
179
* @param addr - The server address to connect to
180
* @throws IOException if connection fails
181
*/
182
public NettyTransceiver(InetSocketAddress addr) throws IOException;
183
184
/**
185
* Creates a new Netty transceiver with custom connection timeout.
186
* @param addr - The server address to connect to
187
* @param connectTimeoutMillis - Connection timeout in milliseconds (null for default)
188
* @throws IOException if connection fails
189
*/
190
public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis) throws IOException;
191
192
/**
193
* Creates a new Netty transceiver with channel customization.
194
* @param addr - The server address to connect to
195
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
196
* @throws IOException if connection fails
197
*/
198
public NettyTransceiver(InetSocketAddress addr, final Consumer<SocketChannel> initializer) throws IOException;
199
200
/**
201
* Creates a new Netty transceiver with timeout and channel customization.
202
* @param addr - The server address to connect to
203
* @param connectTimeoutMillis - Connection timeout in milliseconds (null for default)
204
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
205
* @throws IOException if connection fails
206
*/
207
public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis, final Consumer<SocketChannel> initializer) throws IOException;
208
209
/**
210
* Creates a new Netty transceiver with full customization.
211
* @param addr - The server address to connect to
212
* @param connectTimeoutMillis - Connection timeout in milliseconds (null for default)
213
* @param initializer - Custom channel initializer (e.g., for SSL configuration)
214
* @param bootStrapInitialzier - Custom bootstrap configuration
215
* @throws IOException if connection fails
216
*/
217
public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis, final Consumer<SocketChannel> initializer, final Consumer<Bootstrap> bootStrapInitialzier) throws IOException;
218
219
// Connection management
220
public void close();
221
public void close(boolean awaitCompletion);
222
public String getRemoteName() throws IOException;
223
public boolean isConnected();
224
225
// Communication
226
public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
227
public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
228
public void writeBuffers(List<ByteBuffer> buffers) throws IOException;
229
public List<ByteBuffer> readBuffers() throws IOException; // Throws UnsupportedOperationException
230
231
// Protocol management
232
public Protocol getRemote();
233
public void setRemote(Protocol protocol);
234
235
// Thread safety (no-ops - Netty channels are thread-safe)
236
public void lockChannel();
237
public void unlockChannel();
238
239
// Inner classes for advanced usage
240
public static class WriteFutureListener implements ChannelFutureListener {
241
public WriteFutureListener(Callback<List<ByteBuffer>> callback);
242
public void operationComplete(ChannelFuture future) throws Exception;
243
}
244
245
public static class NettyTransceiverThreadFactory implements ThreadFactory {
246
public NettyTransceiverThreadFactory(String prefix);
247
public Thread newThread(Runnable r);
248
}
249
}
250
```
251
252
**Usage Examples:**
253
254
Basic client connection:
255
```java
256
NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress("server.example.com", 8080));
257
```
258
259
Client with custom timeout:
260
```java
261
NettyTransceiver transceiver = new NettyTransceiver(
262
new InetSocketAddress("server.example.com", 8080),
263
30000 // 30 second timeout
264
);
265
```
266
267
Client with SSL configuration:
268
```java
269
Consumer<SocketChannel> sslInitializer = channel -> {
270
SslHandler sslHandler = sslContext.newHandler(channel.alloc(), "server.example.com", 8080);
271
channel.pipeline().addFirst("ssl", sslHandler);
272
};
273
NettyTransceiver transceiver = new NettyTransceiver(
274
new InetSocketAddress("server.example.com", 8080),
275
sslInitializer
276
);
277
```
278
279
Asynchronous transceive:
280
```java
281
List<ByteBuffer> request = // ... prepare request
282
Callback<List<ByteBuffer>> callback = new Callback<List<ByteBuffer>>() {
283
@Override
284
public void handleResult(List<ByteBuffer> result) {
285
// Handle successful response
286
}
287
288
@Override
289
public void handleError(Throwable error) {
290
// Handle error
291
}
292
};
293
transceiver.transceive(request, callback);
294
```
295
296
Custom thread factory for Netty event loops:
297
```java
298
NettyTransceiver.NettyTransceiverThreadFactory threadFactory =
299
new NettyTransceiver.NettyTransceiverThreadFactory("avro-client-");
300
EventLoopGroup workerGroup = new NioEventLoopGroup(4, threadFactory);
301
```
302
303
### Transport Protocol
304
305
NettyTransportCodec provides the data structures and encoding/decoding functionality for the Netty transport protocol.
306
307
```java { .api }
308
public class NettyTransportCodec {
309
// Data structure for transport protocol
310
public static class NettyDataPack {
311
public NettyDataPack();
312
public NettyDataPack(int serial, List<ByteBuffer> datas);
313
314
public void setSerial(int serial);
315
public int getSerial();
316
public void setDatas(List<ByteBuffer> datas);
317
public List<ByteBuffer> getDatas();
318
}
319
320
// Frame encoder for outgoing messages
321
public static class NettyFrameEncoder extends MessageToMessageEncoder<NettyDataPack> {
322
protected void encode(ChannelHandlerContext ctx, NettyDataPack dataPack, List<Object> out) throws Exception;
323
}
324
325
// Frame decoder for incoming messages
326
public static class NettyFrameDecoder extends ByteToMessageDecoder {
327
public NettyFrameDecoder();
328
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
329
}
330
}
331
```
332
333
**Usage Examples:**
334
335
The transport codec is typically used internally by NettyServer and NettyTransceiver, but can be used directly for custom implementations:
336
337
```java
338
// Create a data pack
339
List<ByteBuffer> data = Arrays.asList(ByteBuffer.wrap("Hello".getBytes()));
340
NettyDataPack dataPack = new NettyDataPack(123, data);
341
342
// The codec classes are typically added to the Netty pipeline automatically
343
channel.pipeline()
344
.addLast("frameDecoder", new NettyFrameDecoder())
345
.addLast("frameEncoder", new NettyFrameEncoder())
346
.addLast("handler", customHandler);
347
```
348
349
## Types
350
351
```java { .api }
352
// From org.apache.avro.ipc package
353
public interface Server {
354
void start();
355
void close();
356
int getPort();
357
void join() throws InterruptedException;
358
}
359
360
public abstract class Transceiver implements Closeable {
361
public abstract String getRemoteName() throws IOException;
362
public abstract List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
363
public abstract void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
364
public abstract List<ByteBuffer> readBuffers() throws IOException;
365
public abstract void writeBuffers(List<ByteBuffer> buffers) throws IOException;
366
public abstract Protocol getRemote();
367
public abstract void setRemote(Protocol protocol);
368
public abstract boolean isConnected();
369
public abstract void lockChannel();
370
public abstract void unlockChannel();
371
}
372
373
public abstract class Responder {
374
public abstract List<ByteBuffer> respond(List<ByteBuffer> request, Transceiver connection) throws IOException;
375
}
376
377
public interface Callback<T> {
378
void handleResult(T result);
379
void handleError(Throwable error);
380
}
381
382
// From Netty
383
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter;
384
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter;
385
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter;
386
public interface ChannelFutureListener extends EventListener;
387
public interface ChannelFuture extends Future<Void>;
388
389
// Java standard types
390
public interface Consumer<T> {
391
void accept(T t);
392
}
393
394
public interface ThreadFactory {
395
Thread newThread(Runnable r);
396
}
397
```
398
399
## Error Handling
400
401
The package throws standard Java IO exceptions and Avro-specific exceptions:
402
403
- **IOException**: Connection errors, communication failures
404
- **InterruptedException**: Thread interruption during blocking operations
405
- **AvroRuntimeException**: Protocol errors, invalid data
406
- **RuntimeException**: Missing callback information, internal errors
407
408
Common error handling patterns:
409
410
```java
411
try {
412
NettyTransceiver transceiver = new NettyTransceiver(address);
413
// Use transceiver...
414
} catch (IOException e) {
415
// Handle connection or communication errors
416
log.error("Failed to connect to server", e);
417
} finally {
418
if (transceiver != null) {
419
transceiver.close();
420
}
421
}
422
```
423
424
For asynchronous operations, handle errors in the callback:
425
426
```java
427
transceiver.transceive(request, new Callback<List<ByteBuffer>>() {
428
@Override
429
public void handleResult(List<ByteBuffer> result) {
430
// Process successful response
431
}
432
433
@Override
434
public void handleError(Throwable error) {
435
if (error instanceof IOException) {
436
// Handle communication error
437
} else if (error instanceof AvroRuntimeException) {
438
// Handle protocol error
439
}
440
}
441
});
442
```
443
444
## Architecture
445
446
The Apache Avro IPC Netty package follows a layered architecture:
447
448
1. **Transport Layer**: NettyTransportCodec handles message framing and serialization
449
2. **Communication Layer**: NettyServer and NettyTransceiver manage connections and message routing
450
3. **Protocol Layer**: Integrates with Avro's RPC system through Responder and Transceiver interfaces
451
452
Key design patterns:
453
- **Non-blocking I/O**: Uses Netty's event-driven architecture for high performance
454
- **Connection pooling**: Automatic connection management and reuse
455
- **Pluggable SSL/TLS**: Support for encrypted connections through channel initializers
456
- **Thread safety**: Thread-safe operations without explicit locking
457
- **Graceful shutdown**: Proper resource cleanup and connection termination