or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md
tile.json

client-configuration.mddocs/

Client Configuration

REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.

Capabilities

RestClientFactory Interface

Factory interface for configuring the Elasticsearch REST client with custom settings.

/**
 * A factory that is used to configure the RestHighLevelClient
 * internally used in the ElasticsearchSink.
 */
@PublicEvolving
public interface RestClientFactory extends Serializable {
    /**
     * Configures the rest client builder.
     * @param restClientBuilder the configured rest client builder.
     */
    void configureRestClientBuilder(RestClientBuilder restClientBuilder);
}

Usage Examples:

import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.elasticsearch.client.RestClientBuilder;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;

import javax.net.ssl.SSLContext;
import java.security.KeyStore;

// Basic authentication configuration
RestClientFactory basicAuthFactory = new RestClientFactory() {
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials("elastic", "password")
                );
                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
    }
};

// Timeout configuration
RestClientFactory timeoutFactory = new RestClientFactory() {
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                return requestConfigBuilder
                    .setConnectTimeout(5000)      // 5 second connection timeout
                    .setSocketTimeout(60000);     // 60 second socket timeout
            }
        });
    }
};

// SSL configuration
RestClientFactory sslFactory = new RestClientFactory() {
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                try {
                    // Load keystore and truststore
                    KeyStore truststore = KeyStore.getInstance("jks");
                    truststore.load(new FileInputStream("/path/to/truststore.jks"), "truststore-password".toCharArray());
                    
                    SSLContextBuilder sslBuilder = SSLContexts.custom()
                        .loadTrustMaterial(truststore, null);
                    
                    SSLContext sslContext = sslBuilder.build();
                    return httpClientBuilder.setSSLContext(sslContext);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to configure SSL", e);
                }
            }
        });
    }
};

// Using custom client factory
ElasticsearchSink<MyData> authenticatedSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setRestClientFactory(basicAuthFactory)
.build();

Advanced Client Configuration

Comprehensive Configuration Example

import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

public class ComprehensiveRestClientFactory implements RestClientFactory {
    private final String username;
    private final String password;
    private final String apiKey;
    private final int connectTimeout;
    private final int socketTimeout;
    private final int maxRetryTimeout;
    private final boolean sslEnabled;
    
    public ComprehensiveRestClientFactory(String username, String password, String apiKey,
                                        int connectTimeout, int socketTimeout, int maxRetryTimeout,
                                        boolean sslEnabled) {
        this.username = username;
        this.password = password;
        this.apiKey = apiKey;
        this.connectTimeout = connectTimeout;
        this.socketTimeout = socketTimeout;
        this.maxRetryTimeout = maxRetryTimeout;
        this.sslEnabled = sslEnabled;
    }
    
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        // Set default headers
        List<Header> defaultHeaders = new ArrayList<>();
        if (apiKey != null && !apiKey.isEmpty()) {
            defaultHeaders.add(new BasicHeader("Authorization", "ApiKey " + apiKey));
        }
        if (!defaultHeaders.isEmpty()) {
            restClientBuilder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
        }
        
        // Configure request timeouts
        restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                return requestConfigBuilder
                    .setConnectTimeout(connectTimeout)
                    .setSocketTimeout(socketTimeout);
            }
        });
        
        // Configure HTTP client
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                // Basic authentication
                if (username != null && password != null && !username.isEmpty() && !password.isEmpty()) {
                    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(
                        AuthScope.ANY,
                        new UsernamePasswordCredentials(username, password)
                    );
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                
                // SSL configuration
                if (sslEnabled) {
                    try {
                        SSLContext sslContext = SSLContextBuilder.create()
                            .loadTrustMaterial(TrustAllStrategy.INSTANCE)
                            .build();
                        httpClientBuilder.setSSLContext(sslContext);
                        httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
                    } catch (Exception e) {
                        throw new RuntimeException("Failed to configure SSL", e);
                    }
                }
                
                // Connection pool configuration
                httpClientBuilder.setMaxConnTotal(100);
                httpClientBuilder.setMaxConnPerRoute(30);
                
                return httpClientBuilder;
            }
        });
        
        // Set max retry timeout
        restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
        
        // Node selector for routing requests
        restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
    }
}

// Usage
RestClientFactory comprehensiveFactory = new ComprehensiveRestClientFactory(
    "elastic",           // username
    "secure_password",   // password
    null,               // API key (null if using basic auth)
    5000,               // connect timeout (5s)
    60000,              // socket timeout (60s)
    120000,             // max retry timeout (2min)
    true                // SSL enabled
);

ElasticsearchSink<Event> configuredSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setRestClientFactory(comprehensiveFactory)
.build();

Common Configuration Patterns

API Key Authentication

RestClientFactory apiKeyFactory = restClientBuilder -> {
    Header[] defaultHeaders = new Header[]{
        new BasicHeader("Authorization", "ApiKey " + "your-api-key-here")
    };
    restClientBuilder.setDefaultHeaders(defaultHeaders);
};

Cloud Elasticsearch Configuration

RestClientFactory cloudFactory = restClientBuilder -> {
    // Cloud authentication
    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(
            AuthScope.ANY,
            new UsernamePasswordCredentials("elastic", "cloud-password")
        );
        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
    });
    
    // Cloud-specific timeouts
    restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> 
        requestConfigBuilder
            .setConnectTimeout(10000)
            .setSocketTimeout(120000)
    );
};

Development/Testing Configuration

RestClientFactory devFactory = restClientBuilder -> {
    // Relaxed timeouts for development
    restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
        requestConfigBuilder
            .setConnectTimeout(1000)
            .setSocketTimeout(30000)
    );
    
    // Disable SSL verification for local testing
    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
        try {
            SSLContext sslContext = SSLContextBuilder.create()
                .loadTrustMaterial(TrustAllStrategy.INSTANCE)
                .build();
            return httpClientBuilder
                .setSSLContext(sslContext)
                .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
        } catch (Exception e) {
            throw new RuntimeException("SSL configuration failed", e);
        }
    });
};

Production High-Availability Configuration

RestClientFactory productionFactory = restClientBuilder -> {
    // Production timeouts
    restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
        requestConfigBuilder
            .setConnectTimeout(5000)
            .setSocketTimeout(60000)
    );
    
    // Connection pool optimization
    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
        httpClientBuilder
            .setMaxConnTotal(200)
            .setMaxConnPerRoute(50)
            .setKeepAliveStrategy((response, context) -> 30000) // 30 second keep-alive
    );
    
    // High retry timeout for resilience
    restClientBuilder.setMaxRetryTimeoutMillis(180000); // 3 minutes
    
    // Skip dedicated master nodes
    restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
};

Configuration with Connection Pooling

RestClientFactory pooledFactory = restClientBuilder -> {
    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
        // Connection manager configuration
        PoolingNHttpClientConnectionManager connectionManager = 
            new PoolingNHttpClientConnectionManager(
                RegistryBuilder.<SchemeIOSessionStrategy>create()
                    .register("http", NoopIOSessionStrategy.INSTANCE)
                    .register("https", SSLIOSessionStrategy.getSystemDefaultStrategy())
                    .build()
            );
        
        connectionManager.setMaxTotal(150);                    // Total connections
        connectionManager.setDefaultMaxPerRoute(50);          // Per-route connections
        connectionManager.setValidateAfterInactivity(30000);  // Validate after 30s inactivity
        
        return httpClientBuilder
            .setConnectionManager(connectionManager)
            .setConnectionManagerShared(false);
    });
};

Monitoring and Logging Configuration

RestClientFactory monitoredFactory = restClientBuilder -> {
    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
        // Add request/response interceptors for monitoring
        httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {
            @Override
            public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
                LOG.debug("Elasticsearch request: {} {}", request.getRequestLine().getMethod(), 
                         request.getRequestLine().getUri());
            }
        });
        
        httpClientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
            @Override
            public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {
                LOG.debug("Elasticsearch response: {}", response.getStatusLine().getStatusCode());
            }
        });
        
        return httpClientBuilder;
    });
    
    // Request logging
    restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
        // Add custom request configuration for debugging
        return requestConfigBuilder.setExpectContinueEnabled(true);
    });
};

Error Handling in Client Configuration

RestClientFactory robustFactory = restClientBuilder -> {
    restClientBuilder.setFailureListener(new RestClient.FailureListener() {
        @Override
        public void onFailure(Node node) {
            LOG.warn("Elasticsearch node failed: {}", node.getHost());
        }
    });
    
    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
        // Retry handler
        httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true));
        
        // Service unavailable retry strategy
        httpClientBuilder.setServiceUnavailableRetryStrategy(
            new ServiceUnavailableRetryStrategy() {
                @Override
                public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
                    return executionCount <= 3 && response.getStatusLine().getStatusCode() == 503;
                }
                
                @Override
                public long getRetryInterval() {
                    return 1000; // 1 second
                }
            }
        );
        
        return httpClientBuilder;
    });
};