0
# Client Configuration
1
2
REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.
3
4
## Capabilities
5
6
### RestClientFactory Interface
7
8
Factory interface for configuring the Elasticsearch REST client with custom settings.
9
10
```java { .api }
11
/**
12
* A factory that is used to configure the RestHighLevelClient
13
* internally used in the ElasticsearchSink.
14
*/
15
@PublicEvolving
16
public interface RestClientFactory extends Serializable {
17
/**
18
* Configures the rest client builder.
19
* @param restClientBuilder the configured rest client builder.
20
*/
21
void configureRestClientBuilder(RestClientBuilder restClientBuilder);
22
}
23
```
24
25
**Usage Examples:**
26
27
```java
28
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
29
import org.elasticsearch.client.RestClientBuilder;
30
import org.apache.http.auth.AuthScope;
31
import org.apache.http.auth.UsernamePasswordCredentials;
32
import org.apache.http.client.CredentialsProvider;
33
import org.apache.http.impl.client.BasicCredentialsProvider;
34
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
35
import org.apache.http.ssl.SSLContextBuilder;
36
import org.apache.http.ssl.SSLContexts;
37
38
import javax.net.ssl.SSLContext;
39
import java.security.KeyStore;
40
41
// Basic authentication configuration
42
RestClientFactory basicAuthFactory = new RestClientFactory() {
43
@Override
44
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
45
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
46
@Override
47
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
48
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
49
credentialsProvider.setCredentials(
50
AuthScope.ANY,
51
new UsernamePasswordCredentials("elastic", "password")
52
);
53
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
54
}
55
});
56
}
57
};
58
59
// Timeout configuration
60
RestClientFactory timeoutFactory = new RestClientFactory() {
61
@Override
62
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
63
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
64
@Override
65
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
66
return requestConfigBuilder
67
.setConnectTimeout(5000) // 5 second connection timeout
68
.setSocketTimeout(60000); // 60 second socket timeout
69
}
70
});
71
}
72
};
73
74
// SSL configuration
75
RestClientFactory sslFactory = new RestClientFactory() {
76
@Override
77
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
78
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
79
@Override
80
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
81
try {
82
// Load keystore and truststore
83
KeyStore truststore = KeyStore.getInstance("jks");
84
truststore.load(new FileInputStream("/path/to/truststore.jks"), "truststore-password".toCharArray());
85
86
SSLContextBuilder sslBuilder = SSLContexts.custom()
87
.loadTrustMaterial(truststore, null);
88
89
SSLContext sslContext = sslBuilder.build();
90
return httpClientBuilder.setSSLContext(sslContext);
91
} catch (Exception e) {
92
throw new RuntimeException("Failed to configure SSL", e);
93
}
94
}
95
});
96
}
97
};
98
99
// Using custom client factory
100
ElasticsearchSink<MyData> authenticatedSink = new ElasticsearchSink.Builder<>(
101
httpHosts,
102
sinkFunction
103
)
104
.setRestClientFactory(basicAuthFactory)
105
.build();
106
```
107
108
### Advanced Client Configuration
109
110
#### Comprehensive Configuration Example
111
112
```java
113
import org.apache.http.Header;
114
import org.apache.http.HttpHost;
115
import org.apache.http.message.BasicHeader;
116
import org.apache.http.client.config.RequestConfig;
117
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
118
119
public class ComprehensiveRestClientFactory implements RestClientFactory {
120
private final String username;
121
private final String password;
122
private final String apiKey;
123
private final int connectTimeout;
124
private final int socketTimeout;
125
private final int maxRetryTimeout;
126
private final boolean sslEnabled;
127
128
public ComprehensiveRestClientFactory(String username, String password, String apiKey,
129
int connectTimeout, int socketTimeout, int maxRetryTimeout,
130
boolean sslEnabled) {
131
this.username = username;
132
this.password = password;
133
this.apiKey = apiKey;
134
this.connectTimeout = connectTimeout;
135
this.socketTimeout = socketTimeout;
136
this.maxRetryTimeout = maxRetryTimeout;
137
this.sslEnabled = sslEnabled;
138
}
139
140
@Override
141
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
142
// Set default headers
143
List<Header> defaultHeaders = new ArrayList<>();
144
if (apiKey != null && !apiKey.isEmpty()) {
145
defaultHeaders.add(new BasicHeader("Authorization", "ApiKey " + apiKey));
146
}
147
if (!defaultHeaders.isEmpty()) {
148
restClientBuilder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
149
}
150
151
// Configure request timeouts
152
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
153
@Override
154
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
155
return requestConfigBuilder
156
.setConnectTimeout(connectTimeout)
157
.setSocketTimeout(socketTimeout);
158
}
159
});
160
161
// Configure HTTP client
162
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
163
@Override
164
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
165
// Basic authentication
166
if (username != null && password != null && !username.isEmpty() && !password.isEmpty()) {
167
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
168
credentialsProvider.setCredentials(
169
AuthScope.ANY,
170
new UsernamePasswordCredentials(username, password)
171
);
172
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
173
}
174
175
// SSL configuration
176
if (sslEnabled) {
177
try {
178
SSLContext sslContext = SSLContextBuilder.create()
179
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
180
.build();
181
httpClientBuilder.setSSLContext(sslContext);
182
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
183
} catch (Exception e) {
184
throw new RuntimeException("Failed to configure SSL", e);
185
}
186
}
187
188
// Connection pool configuration
189
httpClientBuilder.setMaxConnTotal(100);
190
httpClientBuilder.setMaxConnPerRoute(30);
191
192
return httpClientBuilder;
193
}
194
});
195
196
// Set max retry timeout
197
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
198
199
// Node selector for routing requests
200
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
201
}
202
}
203
204
// Usage
205
RestClientFactory comprehensiveFactory = new ComprehensiveRestClientFactory(
206
"elastic", // username
207
"secure_password", // password
208
null, // API key (null if using basic auth)
209
5000, // connect timeout (5s)
210
60000, // socket timeout (60s)
211
120000, // max retry timeout (2min)
212
true // SSL enabled
213
);
214
215
ElasticsearchSink<Event> configuredSink = new ElasticsearchSink.Builder<>(
216
httpHosts,
217
sinkFunction
218
)
219
.setRestClientFactory(comprehensiveFactory)
220
.build();
221
```
222
223
### Common Configuration Patterns
224
225
#### API Key Authentication
226
227
```java
228
RestClientFactory apiKeyFactory = restClientBuilder -> {
229
Header[] defaultHeaders = new Header[]{
230
new BasicHeader("Authorization", "ApiKey " + "your-api-key-here")
231
};
232
restClientBuilder.setDefaultHeaders(defaultHeaders);
233
};
234
```
235
236
#### Cloud Elasticsearch Configuration
237
238
```java
239
RestClientFactory cloudFactory = restClientBuilder -> {
240
// Cloud authentication
241
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
242
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
243
credentialsProvider.setCredentials(
244
AuthScope.ANY,
245
new UsernamePasswordCredentials("elastic", "cloud-password")
246
);
247
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
248
});
249
250
// Cloud-specific timeouts
251
restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
252
requestConfigBuilder
253
.setConnectTimeout(10000)
254
.setSocketTimeout(120000)
255
);
256
};
257
```
258
259
#### Development/Testing Configuration
260
261
```java
262
RestClientFactory devFactory = restClientBuilder -> {
263
// Relaxed timeouts for development
264
restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
265
requestConfigBuilder
266
.setConnectTimeout(1000)
267
.setSocketTimeout(30000)
268
);
269
270
// Disable SSL verification for local testing
271
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
272
try {
273
SSLContext sslContext = SSLContextBuilder.create()
274
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
275
.build();
276
return httpClientBuilder
277
.setSSLContext(sslContext)
278
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
279
} catch (Exception e) {
280
throw new RuntimeException("SSL configuration failed", e);
281
}
282
});
283
};
284
```
285
286
#### Production High-Availability Configuration
287
288
```java
289
RestClientFactory productionFactory = restClientBuilder -> {
290
// Production timeouts
291
restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->
292
requestConfigBuilder
293
.setConnectTimeout(5000)
294
.setSocketTimeout(60000)
295
);
296
297
// Connection pool optimization
298
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
299
httpClientBuilder
300
.setMaxConnTotal(200)
301
.setMaxConnPerRoute(50)
302
.setKeepAliveStrategy((response, context) -> 30000) // 30 second keep-alive
303
);
304
305
// High retry timeout for resilience
306
restClientBuilder.setMaxRetryTimeoutMillis(180000); // 3 minutes
307
308
// Skip dedicated master nodes
309
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
310
};
311
```
312
313
### Configuration with Connection Pooling
314
315
```java
316
RestClientFactory pooledFactory = restClientBuilder -> {
317
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
318
// Connection manager configuration
319
PoolingNHttpClientConnectionManager connectionManager =
320
new PoolingNHttpClientConnectionManager(
321
RegistryBuilder.<SchemeIOSessionStrategy>create()
322
.register("http", NoopIOSessionStrategy.INSTANCE)
323
.register("https", SSLIOSessionStrategy.getSystemDefaultStrategy())
324
.build()
325
);
326
327
connectionManager.setMaxTotal(150); // Total connections
328
connectionManager.setDefaultMaxPerRoute(50); // Per-route connections
329
connectionManager.setValidateAfterInactivity(30000); // Validate after 30s inactivity
330
331
return httpClientBuilder
332
.setConnectionManager(connectionManager)
333
.setConnectionManagerShared(false);
334
});
335
};
336
```
337
338
### Monitoring and Logging Configuration
339
340
```java
341
RestClientFactory monitoredFactory = restClientBuilder -> {
342
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
343
// Add request/response interceptors for monitoring
344
httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {
345
@Override
346
public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
347
LOG.debug("Elasticsearch request: {} {}", request.getRequestLine().getMethod(),
348
request.getRequestLine().getUri());
349
}
350
});
351
352
httpClientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
353
@Override
354
public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {
355
LOG.debug("Elasticsearch response: {}", response.getStatusLine().getStatusCode());
356
}
357
});
358
359
return httpClientBuilder;
360
});
361
362
// Request logging
363
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
364
// Add custom request configuration for debugging
365
return requestConfigBuilder.setExpectContinueEnabled(true);
366
});
367
};
368
```
369
370
### Error Handling in Client Configuration
371
372
```java
373
RestClientFactory robustFactory = restClientBuilder -> {
374
restClientBuilder.setFailureListener(new RestClient.FailureListener() {
375
@Override
376
public void onFailure(Node node) {
377
LOG.warn("Elasticsearch node failed: {}", node.getHost());
378
}
379
});
380
381
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
382
// Retry handler
383
httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true));
384
385
// Service unavailable retry strategy
386
httpClientBuilder.setServiceUnavailableRetryStrategy(
387
new ServiceUnavailableRetryStrategy() {
388
@Override
389
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
390
return executionCount <= 3 && response.getStatusLine().getStatusCode() == 503;
391
}
392
393
@Override
394
public long getRetryInterval() {
395
return 1000; // 1 second
396
}
397
}
398
);
399
400
return httpClientBuilder;
401
});
402
};
403
```