Netty 4 based transport implementation plugin for Elasticsearch providing high-performance networking layer for HTTP and node-to-node communications
—
The resource management system provides efficient allocation and sharing of Netty resources including EventLoopGroups, ByteBuf allocators, and connection pools. This system optimizes memory usage and performance across both transport and HTTP layers.
Manages shared Netty EventLoopGroup instances that can be used by both transport and HTTP layers for efficient resource utilization.
/**
* Creates and manages shared EventLoopGroup instances for transport and HTTP
* Provides resource sharing when configured, or dedicated groups when needed
*/
public final class SharedGroupFactory {
/**
* Creates factory with specified settings
* @param settings Configuration settings for group management
*/
public SharedGroupFactory(Settings settings);
/**
* Gets configuration settings used by this factory
* @return Settings object used for initialization
*/
public Settings getSettings();
/**
* Gets the configured worker count for transport operations
* @return Number of worker threads for transport layer
*/
public int getTransportWorkerCount();
/**
* Gets EventLoopGroup for transport layer operations
* May be shared with HTTP layer depending on configuration
* @return SharedGroup for transport operations
*/
public SharedGroup getTransportGroup();
/**
* Gets EventLoopGroup for HTTP layer operations
* May be shared with transport layer or dedicated depending on configuration
* @return SharedGroup for HTTP operations
*/
public SharedGroup getHttpGroup();
}Resource Sharing Behavior:
// Factory determines sharing based on HTTP worker count setting
Settings settings = Settings.builder()
.put("http.netty.worker_count", 0) // 0 = shared, >0 = dedicated
.put("transport.netty.worker_count", 4)
.build();
SharedGroupFactory factory = new SharedGroupFactory(settings);
// With http.netty.worker_count = 0, both return same group
SharedGroup transportGroup = factory.getTransportGroup();
SharedGroup httpGroup = factory.getHttpGroup(); // Same as transport
// With http.netty.worker_count > 0, returns different groups
Settings dedicatedSettings = Settings.builder()
.put("http.netty.worker_count", 2) // Dedicated HTTP workers
.build();
SharedGroupFactory dedicatedFactory = new SharedGroupFactory(dedicatedSettings);
SharedGroup dedicatedTransport = dedicatedFactory.getTransportGroup();
SharedGroup dedicatedHttp = dedicatedFactory.getHttpGroup(); // Different groupWrapper interface for Netty EventLoopGroup providing resource management and lifecycle control.
/**
* Wrapper for Netty EventLoopGroup with reference counting and lifecycle management
* Allows safe sharing of EventLoopGroup instances across multiple components
*/
public interface SharedGroup extends Releasable {
/**
* Gets the underlying Netty EventLoopGroup
* @return EventLoopGroup for Netty channel operations
*/
EventLoopGroup getEventLoopGroup();
/**
* Increments reference count for this shared group
* Must be paired with corresponding release() call
*/
void acquire();
/**
* Decrements reference count and releases resources when count reaches zero
* EventLoopGroup is shutdown when no more references exist
*/
void release();
/**
* Gets current reference count
* @return Number of active references to this group
*/
int getRefCount();
}Usage Example:
SharedGroupFactory factory = new SharedGroupFactory(settings);
SharedGroup transportGroup = factory.getTransportGroup();
// Acquire reference before using in component
transportGroup.acquire();
// Use EventLoopGroup for Netty operations
EventLoopGroup eventLoop = transportGroup.getEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(eventLoop)
.channel(NioSocketChannel.class);
// Always release when done
transportGroup.release();Provides optimized ByteBuf allocation strategies based on JVM configuration and system resources.
/**
* Manages Netty ByteBuf allocation with optimized strategies
* Configures allocator based on JVM heap size, GC settings, and system properties
*/
public class NettyAllocator {
/**
* Gets configured ByteBuf allocator instance
* Returns either pooled or unpooled allocator based on system configuration
* @return ByteBufAllocator for creating buffers
*/
public static ByteBufAllocator getAllocator();
/**
* Gets suggested maximum allocation size for single buffer
* Based on heap size and allocation strategy
* @return Maximum recommended buffer size in bytes
*/
public static long suggestedMaxAllocationSize();
/**
* Gets human-readable description of allocator configuration
* Includes allocator type, max allocation size, and configuration factors
* @return Description string for logging and diagnostics
*/
public static String getAllocatorDescription();
/**
* Logs allocator description to logger if not already logged
* Thread-safe single logging of allocator configuration at startup
*/
public static void logAllocatorDescriptionIfNeeded();
/**
* Gets optimal Netty channel type based on allocator configuration
* Returns different channel types for direct/heap buffer strategies
* @return Channel class for client connections
*/
public static Class<? extends Channel> getChannelType();
/**
* Gets optimal Netty server channel type based on allocator configuration
* Returns different server channel types for direct/heap buffer strategies
* @return ServerChannel class for server listening
*/
public static Class<? extends ServerChannel> getServerChannelType();
}Allocator Selection Logic:
// NettyAllocator automatically selects strategy based on:
// 1. System property override
System.setProperty("es.use_unpooled_allocator", "true"); // Forces unpooled
// 2. Heap size and GC type analysis
long heapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
boolean g1gcEnabled = Boolean.parseBoolean(JvmInfo.jvmInfo().useG1GC());
// 3. Automatic selection
ByteBufAllocator allocator = NettyAllocator.getAllocator();
long maxAllocation = NettyAllocator.suggestedMaxAllocationSize();
// Example configurations:
// Small heap (<1GB): UnpooledByteBufAllocator, 1MB max allocation
// Large heap + G1GC: PooledByteBufAllocator, 8MB max allocation
// Large heap + other GC: PooledByteBufAllocator, 1MB max allocationUtility classes for efficient ByteBuf handling and size management.
/**
* Netty pipeline handler for ByteBuf size management and optimization
* Handles buffer sizing predictions and memory usage optimization
*/
public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> {
// Automatically handles ByteBuf sizing and optimization
// Adjusts buffer allocation strategies based on usage patterns
}The resource management system implements several optimization strategies:
Adaptive Buffer Sizing: Netty's AdaptiveRecvByteBufAllocator adjusts buffer sizes based on actual data patterns
Pooled Allocation: Uses PooledByteBufAllocator for large heap JVMs to reduce GC pressure
Composite Buffers: Efficiently handles large responses by composing multiple smaller buffers
Reference Counting: Proper ByteBuf lifecycle management prevents memory leaks
Chunk Size Tuning: Optimizes chunk and page sizes based on heap size and GC characteristics
Direct Memory Management: Uses direct buffers for network I/O to avoid copying
// Configure shared EventLoopGroup between transport and HTTP
Settings sharedConfig = Settings.builder()
.put("transport.netty.worker_count", 8) // Transport workers
.put("http.netty.worker_count", 0) // Share with transport (default)
.put("transport.netty.boss_count", 1) // Connection acceptance
.build();
SharedGroupFactory sharedFactory = new SharedGroupFactory(sharedConfig);
// Both transport and HTTP use same EventLoopGroup// Configure dedicated EventLoopGroups for transport and HTTP
Settings dedicatedConfig = Settings.builder()
.put("transport.netty.worker_count", 6) // Transport workers
.put("http.netty.worker_count", 4) // Dedicated HTTP workers
.put("transport.netty.boss_count", 1) // Transport boss threads
.build();
SharedGroupFactory dedicatedFactory = new SharedGroupFactory(dedicatedConfig);
// Transport and HTTP use separate EventLoopGroups// System properties affecting buffer allocation
System.setProperty("es.use_unpooled_allocator", "false"); // Use pooled (default)
System.setProperty("es.unsafe.use_netty_default_allocator", "false"); // Use ES tuning
System.setProperty("es.unsafe.use_netty_default_chunk_and_page_size", "false"); // Use ES sizing
// Get optimized allocator
ByteBufAllocator allocator = NettyAllocator.getAllocator();
String description = NettyAllocator.getAllocatorDescription();
long maxSize = NettyAllocator.suggestedMaxAllocationSize();
// Create buffers with optimal settings
ByteBuf buffer = allocator.buffer(); // Default size
ByteBuf directBuffer = allocator.directBuffer(1024); // Specific size
CompositeByteBuf composite = allocator.compositeBuffer(); // Composite bufferThe resource management system integrates with Elasticsearch through:
Settings System: Configuration via elasticsearch.yml and cluster settings
Node Lifecycle: Proper startup and shutdown of shared resources
Circuit Breaker Integration: Respects memory circuit breakers for allocation limits
Monitoring Integration: Provides metrics on buffer usage and EventLoopGroup utilization
JVM Integration: Adapts allocation strategies based on JVM configuration
Plugin Coordination: Ensures proper resource sharing between plugin components
The resource management system ensures efficient utilization of system resources while maintaining the performance characteristics required for high-throughput Elasticsearch deployments.
Install with Tessl CLI
npx tessl i tessl/maven-org-elasticsearch-plugin--transport-netty4-client