Netty 3 based transport implementation for Elasticsearch providing TCP and HTTP transport layers
—
Complete HTTP server implementation for Elasticsearch's REST API. Handles HTTP requests, response processing, request pipelining, and CORS support for web-based clients using Netty 3.
Main HTTP server transport implementation that handles all REST API communication for Elasticsearch.
/**
* Netty 3-based HTTP server transport for Elasticsearch REST API.
* Supports HTTP request pipelining, CORS, compression, and various TCP optimizations.
*/
public class Netty3HttpServerTransport extends AbstractLifecycleComponent
implements HttpServerTransport {
/**
* Constructor for creating a new HTTP server transport instance
* @param settings Elasticsearch configuration settings
* @param networkService Network utility service
* @param bigArrays Memory management for large arrays
* @param threadPool Thread pool for async operations
* @param xContentRegistry Registry for content parsers (JSON, YAML, etc.)
* @param dispatcher HTTP request dispatcher for routing requests
*/
public Netty3HttpServerTransport(Settings settings, NetworkService networkService,
BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
HttpServerTransport.Dispatcher dispatcher);
/**
* Returns the current configuration settings
* @return Settings object with current configuration
*/
public Settings settings();
/**
* Returns the bound transport address information
* @return BoundTransportAddress containing publish and bound addresses
*/
public BoundTransportAddress boundAddress();
/**
* Returns HTTP server information and statistics
* @return HttpInfo object with server details
*/
public HttpInfo info();
/**
* Returns current HTTP server statistics
* @return HttpStats object with request/response metrics
*/
public HttpStats stats();
/**
* Returns the current CORS configuration
* @return Netty3CorsConfig object with CORS policy settings
*/
public Netty3CorsConfig getCorsConfig();
/**
* Configures the server channel pipeline factory
* @return ChannelPipelineFactory for HTTP server channels
*/
public ChannelPipelineFactory configureServerChannelPipelineFactory();
/**
* Resolves the publish port for the HTTP transport
* @param settings Configuration settings
* @param boundAddresses List of bound addresses
* @param publishAddress Address to publish
* @return Resolved port number for publishing
*/
public static int resolvePublishPort(Settings settings,
List<InetSocketTransportAddress> boundAddresses,
InetAddress publishAddress);
}Comprehensive settings for HTTP server performance, TCP optimization, and buffer management.
/**
* Maximum capacity for HTTP cumulation buffers to prevent memory issues
* Default: unlimited (-1)
*/
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY =
byteSizeSetting("http.netty.max_cumulation_buffer_capacity", new ByteSizeValue(-1), ...);
/**
* Maximum number of components in HTTP composite buffers
* Default: -1 (unlimited)
*/
public static final Setting<Integer> SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS =
intSetting("http.netty.max_composite_buffer_components", -1, ...);
/**
* Number of worker threads for handling HTTP I/O operations
* Default: 2 * number of available processors
*/
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT =
intSetting("http.netty.worker_count", ...);
/**
* TCP no delay setting for HTTP connections
* Default: true (Nagle's algorithm disabled)
*/
public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
boolSetting("http.tcp_no_delay", NetworkService.TcpSettings.TCP_NO_DELAY, ...);
/**
* TCP keep alive setting for HTTP connections
* Default: true
*/
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
boolSetting("http.tcp.keep_alive", NetworkService.TcpSettings.TCP_KEEP_ALIVE, ...);
/**
* Use blocking server implementation (OIO instead of NIO)
* Default: false (use NIO)
*/
public static final Setting<Boolean> SETTING_HTTP_TCP_BLOCKING_SERVER =
boolSetting("http.tcp.blocking_server", NetworkService.TcpSettings.TCP_BLOCKING_SERVER, ...);
/**
* TCP socket reuse address setting
* Default: true
*/
public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =
boolSetting("http.tcp.reuse_address", NetworkService.TcpSettings.TCP_REUSE_ADDRESS, ...);
/**
* TCP send buffer size for HTTP connections
* Default: 0 (use system default)
*/
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_SEND_BUFFER_SIZE =
byteSizeSetting("http.tcp.send_buffer_size", NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE, ...);
/**
* TCP receive buffer size for HTTP connections
* Default: 0 (use system default)
*/
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE =
byteSizeSetting("http.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE, ...);Usage Example:
import org.elasticsearch.http.netty3.Netty3HttpServerTransport;
import org.elasticsearch.common.settings.Settings;
// Configure HTTP transport with custom settings
Settings settings = Settings.builder()
.put("http.netty.worker_count", 8)
.put("http.tcp_no_delay", true)
.put("http.tcp.keep_alive", true)
.put("http.tcp.send_buffer_size", "64kb")
.put("http.tcp.receive_buffer_size", "64kb")
.put("http.netty.max_cumulation_buffer_capacity", "256mb")
.build();
// Create HTTP transport (typically done by Elasticsearch internally)
Netty3HttpServerTransport httpTransport = new Netty3HttpServerTransport(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher
);
// Access transport information
HttpInfo info = httpTransport.info();
HttpStats stats = httpTransport.stats();
BoundTransportAddress address = httpTransport.boundAddress();The transport provides classes for handling HTTP requests and responses with full Netty 3 integration.
/**
* HTTP request wrapper that extends Elasticsearch's RestRequest
*/
public class Netty3HttpRequest extends RestRequest {
/**
* Constructor for wrapping Netty HTTP requests
* @param xContentRegistry Registry for content parsing
* @param request Netty HTTP request object
* @param channel Netty channel for the request
*/
public Netty3HttpRequest(NamedXContentRegistry xContentRegistry,
HttpRequest request, Channel channel);
/**
* Returns the underlying Netty HTTP request
* @return HttpRequest object from Netty
*/
public HttpRequest request();
/**
* Returns the HTTP method for this request
* @return RestRequest.Method (GET, POST, PUT, DELETE, etc.)
*/
public RestRequest.Method method();
/**
* Returns the request URI
* @return String containing the full request URI
*/
public String uri();
/**
* Checks if the request has content body
* @return true if request contains content data
*/
public boolean hasContent();
/**
* Returns the request content as bytes
* @return BytesReference containing request body
*/
public BytesReference content();
/**
* Returns the remote client address
* @return SocketAddress of the client
*/
public SocketAddress getRemoteAddress();
/**
* Returns the local server address
* @return SocketAddress of the server
*/
public SocketAddress getLocalAddress();
/**
* Returns the underlying Netty channel
* @return Channel object for this request
*/
public Channel getChannel();
}/**
* HTTP response channel for sending responses back to clients
*/
public class Netty3HttpChannel implements RestChannel {
/**
* Constructor for HTTP response channel
* @param transport Parent HTTP server transport
* @param request Original HTTP request
* @param messageEvent Netty message event
* @param detailedErrorsEnabled Whether to include detailed error info
* @param threadContext Thread context for request handling
*/
public Netty3HttpChannel(Netty3HttpServerTransport transport,
Netty3HttpRequest request,
OrderedUpstreamMessageEvent messageEvent,
boolean detailedErrorsEnabled,
ThreadContext threadContext);
/**
* Creates a new bytes output stream for response content
* @return BytesStreamOutput for writing response data
*/
public BytesStreamOutput newBytesOutput();
/**
* Sends the HTTP response to the client
* @param response RestResponse containing response data and headers
*/
public void sendResponse(RestResponse response);
}The server configures a comprehensive pipeline for handling HTTP requests with support for compression, CORS, pipelining, and custom handlers.
/**
* Server channel pipeline includes:
* - HTTP request decoder for parsing incoming requests
* - HTTP response encoder for formatting outgoing responses
* - Content aggregator for handling chunked requests
* - Optional compression/decompression handlers
* - CORS handler for cross-origin requests
* - Pipelining handler for managing multiple concurrent requests
* - Custom request handler for Elasticsearch-specific processing
*/
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// Basic HTTP handling
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new ESNetty3HttpResponseEncoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(maxContentLength));
// Optional compression
if (compression) {
pipeline.addLast("decoder_compress", new HttpContentDecompressor());
pipeline.addLast("encoder_compress", new HttpContentCompressor(compressionLevel));
}
// CORS support
if (corsConfig.isCorsSupportEnabled()) {
pipeline.addLast("cors", new Netty3CorsHandler(corsConfig));
}
// HTTP pipelining
if (pipelining) {
pipeline.addLast("pipelining", new HttpPipeliningHandler(logger, maxEventsHeld));
}
// Elasticsearch request handler
pipeline.addLast("handler", requestHandler);
return pipeline;
}
};
}The HTTP transport includes several performance optimizations:
Install with Tessl CLI
npx tessl i tessl/maven-org-elasticsearch-plugin--transport-netty3-client