REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.
Factory interface for configuring the Elasticsearch REST client with custom settings.
/**
* A factory that is used to configure the RestHighLevelClient
* internally used in the ElasticsearchSink.
*/
@PublicEvolving
public interface RestClientFactory extends Serializable {
/**
* Configures the rest client builder.
* @param restClientBuilder the configured rest client builder.
*/
void configureRestClientBuilder(RestClientBuilder restClientBuilder);
}Usage Examples:
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.elasticsearch.client.RestClientBuilder;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import javax.net.ssl.SSLContext;
import java.security.KeyStore;
// Basic authentication configuration
RestClientFactory basicAuthFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "password")
);
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
};
// Timeout configuration
RestClientFactory timeoutFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(5000) // 5 second connection timeout
.setSocketTimeout(60000); // 60 second socket timeout
}
});
}
};
// SSL configuration
RestClientFactory sslFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
try {
// Load keystore and truststore
KeyStore truststore = KeyStore.getInstance("jks");
truststore.load(new FileInputStream("/path/to/truststore.jks"), "truststore-password".toCharArray());
SSLContextBuilder sslBuilder = SSLContexts.custom()
.loadTrustMaterial(truststore, null);
SSLContext sslContext = sslBuilder.build();
return httpClientBuilder.setSSLContext(sslContext);
} catch (Exception e) {
throw new RuntimeException("Failed to configure SSL", e);
}
}
});
}
};
// Using custom client factory
ElasticsearchSink<MyData> authenticatedSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setRestClientFactory(basicAuthFactory)
.build();import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
public class ComprehensiveRestClientFactory implements RestClientFactory {
private final String username;
private final String password;
private final String apiKey;
private final int connectTimeout;
private final int socketTimeout;
private final int maxRetryTimeout;
private final boolean sslEnabled;
public ComprehensiveRestClientFactory(String username, String password, String apiKey,
int connectTimeout, int socketTimeout, int maxRetryTimeout,
boolean sslEnabled) {
this.username = username;
this.password = password;
this.apiKey = apiKey;
this.connectTimeout = connectTimeout;
this.socketTimeout = socketTimeout;
this.maxRetryTimeout = maxRetryTimeout;
this.sslEnabled = sslEnabled;
}
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
// Set default headers
List<Header> defaultHeaders = new ArrayList<>();
if (apiKey != null && !apiKey.isEmpty()) {
defaultHeaders.add(new BasicHeader("Authorization", "ApiKey " + apiKey));
}
if (!defaultHeaders.isEmpty()) {
restClientBuilder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
}
// Configure request timeouts
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout);
}
});
// Configure HTTP client
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// Basic authentication
if (username != null && password != null && !username.isEmpty() && !password.isEmpty()) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password)
);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
// SSL configuration
if (sslEnabled) {
try {
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
.build();
httpClientBuilder.setSSLContext(sslContext);
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (Exception e) {
throw new RuntimeException("Failed to configure SSL", e);
}
}
// Connection pool configuration
httpClientBuilder.setMaxConnTotal(100);
httpClientBuilder.setMaxConnPerRoute(30);
return httpClientBuilder;
}
});
// Set max retry timeout
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
// Node selector for routing requests
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
}
}
// Usage
RestClientFactory comprehensiveFactory = new ComprehensiveRestClientFactory(
"elastic", // username
"secure_password", // password
null, // API key (null if using basic auth)
5000, // connect timeout (5s)
60000, // socket timeout (60s)
120000, // max retry timeout (2min)
true // SSL enabled
);
ElasticsearchSink<Event> configuredSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setRestClientFactory(comprehensiveFactory)
.build();RestClientFactory apiKeyFactory = restClientBuilder -> {
Header[] defaultHeaders = new Header[]{
new BasicHeader("Authorization", "ApiKey " + "your-api-key-here")
};
restClientBuilder.setDefaultHeaders(defaultHeaders);
};RestClientFactory cloudFactory = restClientBuilder -> {
// Cloud authentication
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "cloud-password")
);
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
// Cloud-specific timeouts
restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(10000)
.setSocketTimeout(120000)
);
};RestClientFactory devFactory = restClientBuilder -> {
// Relaxed timeouts for development
restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(1000)
.setSocketTimeout(30000)
);
// Disable SSL verification for local testing
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
try {
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
.build();
return httpClientBuilder
.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (Exception e) {
throw new RuntimeException("SSL configuration failed", e);
}
});
};RestClientFactory productionFactory = restClientBuilder -> {
// Production timeouts
restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000)
);
// Connection pool optimization
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setMaxConnTotal(200)
.setMaxConnPerRoute(50)
.setKeepAliveStrategy((response, context) -> 30000) // 30 second keep-alive
);
// High retry timeout for resilience
restClientBuilder.setMaxRetryTimeoutMillis(180000); // 3 minutes
// Skip dedicated master nodes
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
};RestClientFactory pooledFactory = restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// Connection manager configuration
PoolingNHttpClientConnectionManager connectionManager =
new PoolingNHttpClientConnectionManager(
RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", SSLIOSessionStrategy.getSystemDefaultStrategy())
.build()
);
connectionManager.setMaxTotal(150); // Total connections
connectionManager.setDefaultMaxPerRoute(50); // Per-route connections
connectionManager.setValidateAfterInactivity(30000); // Validate after 30s inactivity
return httpClientBuilder
.setConnectionManager(connectionManager)
.setConnectionManagerShared(false);
});
};RestClientFactory monitoredFactory = restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// Add request/response interceptors for monitoring
httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {
@Override
public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
LOG.debug("Elasticsearch request: {} {}", request.getRequestLine().getMethod(),
request.getRequestLine().getUri());
}
});
httpClientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
@Override
public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {
LOG.debug("Elasticsearch response: {}", response.getStatusLine().getStatusCode());
}
});
return httpClientBuilder;
});
// Request logging
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// Add custom request configuration for debugging
return requestConfigBuilder.setExpectContinueEnabled(true);
});
};RestClientFactory robustFactory = restClientBuilder -> {
restClientBuilder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
LOG.warn("Elasticsearch node failed: {}", node.getHost());
}
});
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// Retry handler
httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true));
// Service unavailable retry strategy
httpClientBuilder.setServiceUnavailableRetryStrategy(
new ServiceUnavailableRetryStrategy() {
@Override
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
return executionCount <= 3 && response.getStatusLine().getStatusCode() == 503;
}
@Override
public long getRetryInterval() {
return 1000; // 1 second
}
}
);
return httpClientBuilder;
});
};