Netty 3 based transport implementation for Elasticsearch providing TCP and HTTP transport layers
—
Core TCP transport implementation for internal Elasticsearch cluster communication. Provides connection management, message handling, and network optimization for node-to-node communication using Netty 3.
Main TCP transport implementation that handles all internal cluster communication between Elasticsearch nodes.
/**
* Netty 3-based TCP transport implementation for Elasticsearch cluster communication.
* Supports 4 types of connections per node: low/med/high/ping.
* - Low: for batch oriented APIs (like recovery) with high payload
* - Med: for typical search/single doc index operations
* - High: for cluster state operations
* - Ping: for sending ping requests to other nodes
*/
public class Netty3Transport extends TcpTransport<Channel> {
/**
* Constructor for creating a new Netty3Transport instance
* @param settings Elasticsearch configuration settings
* @param threadPool Thread pool for async operations
* @param networkService Network utility service
* @param bigArrays Memory management for large arrays
* @param namedWriteableRegistry Registry for serializable objects
* @param circuitBreakerService Circuit breaker for memory protection
*/
public Netty3Transport(Settings settings, ThreadPool threadPool,
NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService);
/**
* Returns the number of currently open server channels
* @return Number of open server channels
*/
public long serverOpen();
/**
* Configures the channel pipeline factory for client connections
* @return ChannelPipelineFactory for client channels
*/
public ChannelPipelineFactory configureClientChannelPipelineFactory();
/**
* Configures the channel pipeline factory for server connections
* @param name Profile name for the server
* @param settings Settings specific to this server profile
* @return ChannelPipelineFactory for server channels
*/
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings settings);
}Comprehensive configuration settings for tuning TCP transport performance and behavior.
/**
* Number of worker threads for handling I/O operations
* Default: 2 * number of available processors
*/
public static final Setting<Integer> WORKER_COUNT =
new Setting<>("transport.netty.worker_count", ...);
/**
* Maximum capacity for cumulation buffers to prevent memory issues
* Default: unlimited (-1)
*/
public static final Setting<ByteSizeValue> NETTY_MAX_CUMULATION_BUFFER_CAPACITY =
byteSizeSetting("transport.netty.max_cumulation_buffer_capacity", ...);
/**
* Maximum number of components in composite buffers
* Default: -1 (unlimited)
*/
public static final Setting<Integer> NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS =
intSetting("transport.netty.max_composite_buffer_components", ...);
/**
* Size for receive buffer size predictor for optimal buffer allocation
* Default: 64kb
*/
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE =
byteSizeSetting("transport.netty.receive_predictor_size", ...);
/**
* Minimum size for receive buffer size predictor
* Default: 64b
*/
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MIN =
byteSizeSetting("transport.netty.receive_predictor_min", ...);
/**
* Maximum size for receive buffer size predictor
* Default: 64kb
*/
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX =
byteSizeSetting("transport.netty.receive_predictor_max", ...);
/**
* Number of boss threads for accepting connections
* Default: 1
*/
public static final Setting<Integer> NETTY_BOSS_COUNT =
intSetting("transport.netty.boss_count", ...);Usage Example:
import org.elasticsearch.transport.netty3.Netty3Transport;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.network.NetworkService;
// Configure transport with custom settings
Settings settings = Settings.builder()
.put("transport.netty.worker_count", 4)
.put("transport.netty.max_cumulation_buffer_capacity", "128mb")
.put("transport.netty.receive_predictor_size", "128kb")
.put("transport.netty.boss_count", 1)
.build();
// Create transport instance (typically done by Elasticsearch internally)
Netty3Transport transport = new Netty3Transport(
settings,
threadPool,
networkService,
bigArrays,
namedWriteableRegistry,
circuitBreakerService
);
// Check server status
long openChannels = transport.serverOpen();
System.out.println("Open server channels: " + openChannels);The transport configures different pipeline factories for client and server channels.
/**
* Client channel pipeline includes:
* - Size header frame decoder for message framing
* - Message channel handler for processing transport messages
* - Connection tracking and management
*/
public ChannelPipelineFactory configureClientChannelPipelineFactory() {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("size", new Netty3SizeHeaderFrameDecoder());
pipeline.addLast("dispatcher", new Netty3MessageChannelHandler(Netty3Transport.this, logger));
return pipeline;
}
};
}
/**
* Server channel pipeline includes:
* - Open channel tracking
* - Size header frame decoder
* - Message handler for incoming requests
* - Profile-specific configuration
*/
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings settings) {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("open_channels", openChannelsHandler);
pipeline.addLast("size", new Netty3SizeHeaderFrameDecoder());
pipeline.addLast("dispatcher", new Netty3MessageChannelHandler(Netty3Transport.this, logger, name));
return pipeline;
}
};
}The transport uses Netty 3's buffer management system with configurable sizing and optimization:
The transport supports multiple connection types for different traffic patterns:
The transport includes comprehensive error handling for network failures, connection timeouts, and message corruption, with automatic retry mechanisms and circuit breaker integration for system protection.
Install with Tessl CLI
npx tessl i tessl/maven-org-elasticsearch-plugin--transport-netty3-client