or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md

client-configuration.mddocs/

0

# Client Configuration

1

2

REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.

3

4

## Capabilities

5

6

### RestClientFactory Interface

7

8

Factory interface for configuring the Elasticsearch REST client with custom settings.

9

10

```java { .api }

11

/**

12

* A factory that is used to configure the RestHighLevelClient

13

* internally used in the ElasticsearchSink.

14

*/

15

@PublicEvolving

16

public interface RestClientFactory extends Serializable {

17

/**

18

* Configures the rest client builder.

19

* @param restClientBuilder the configured rest client builder.

20

*/

21

void configureRestClientBuilder(RestClientBuilder restClientBuilder);

22

}

23

```

24

25

**Usage Examples:**

26

27

```java

28

import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;

29

import org.elasticsearch.client.RestClientBuilder;

30

import org.apache.http.auth.AuthScope;

31

import org.apache.http.auth.UsernamePasswordCredentials;

32

import org.apache.http.client.CredentialsProvider;

33

import org.apache.http.impl.client.BasicCredentialsProvider;

34

import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

35

import org.apache.http.ssl.SSLContextBuilder;

36

import org.apache.http.ssl.SSLContexts;

37

38

import javax.net.ssl.SSLContext;

39

import java.security.KeyStore;

40

41

// Basic authentication configuration

42

RestClientFactory basicAuthFactory = new RestClientFactory() {

43

@Override

44

public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

45

restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

46

@Override

47

public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

48

CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

49

credentialsProvider.setCredentials(

50

AuthScope.ANY,

51

new UsernamePasswordCredentials("elastic", "password")

52

);

53

return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

54

}

55

});

56

}

57

};

58

59

// Timeout configuration

60

RestClientFactory timeoutFactory = new RestClientFactory() {

61

@Override

62

public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

63

restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {

64

@Override

65

public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {

66

return requestConfigBuilder

67

.setConnectTimeout(5000) // 5 second connection timeout

68

.setSocketTimeout(60000); // 60 second socket timeout

69

}

70

});

71

}

72

};

73

74

// SSL configuration

75

RestClientFactory sslFactory = new RestClientFactory() {

76

@Override

77

public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

78

restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

79

@Override

80

public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

81

try {

82

// Load keystore and truststore

83

KeyStore truststore = KeyStore.getInstance("jks");

84

truststore.load(new FileInputStream("/path/to/truststore.jks"), "truststore-password".toCharArray());

85

86

SSLContextBuilder sslBuilder = SSLContexts.custom()

87

.loadTrustMaterial(truststore, null);

88

89

SSLContext sslContext = sslBuilder.build();

90

return httpClientBuilder.setSSLContext(sslContext);

91

} catch (Exception e) {

92

throw new RuntimeException("Failed to configure SSL", e);

93

}

94

}

95

});

96

}

97

};

98

99

// Using custom client factory

100

ElasticsearchSink<MyData> authenticatedSink = new ElasticsearchSink.Builder<>(

101

httpHosts,

102

sinkFunction

103

)

104

.setRestClientFactory(basicAuthFactory)

105

.build();

106

```

107

108

### Advanced Client Configuration

109

110

#### Comprehensive Configuration Example

111

112

```java

113

import org.apache.http.Header;

114

import org.apache.http.HttpHost;

115

import org.apache.http.message.BasicHeader;

116

import org.apache.http.client.config.RequestConfig;

117

import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

118

119

public class ComprehensiveRestClientFactory implements RestClientFactory {

120

private final String username;

121

private final String password;

122

private final String apiKey;

123

private final int connectTimeout;

124

private final int socketTimeout;

125

private final int maxRetryTimeout;

126

private final boolean sslEnabled;

127

128

public ComprehensiveRestClientFactory(String username, String password, String apiKey,

129

int connectTimeout, int socketTimeout, int maxRetryTimeout,

130

boolean sslEnabled) {

131

this.username = username;

132

this.password = password;

133

this.apiKey = apiKey;

134

this.connectTimeout = connectTimeout;

135

this.socketTimeout = socketTimeout;

136

this.maxRetryTimeout = maxRetryTimeout;

137

this.sslEnabled = sslEnabled;

138

}

139

140

@Override

141

public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

142

// Set default headers

143

List<Header> defaultHeaders = new ArrayList<>();

144

if (apiKey != null && !apiKey.isEmpty()) {

145

defaultHeaders.add(new BasicHeader("Authorization", "ApiKey " + apiKey));

146

}

147

if (!defaultHeaders.isEmpty()) {

148

restClientBuilder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));

149

}

150

151

// Configure request timeouts

152

restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {

153

@Override

154

public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {

155

return requestConfigBuilder

156

.setConnectTimeout(connectTimeout)

157

.setSocketTimeout(socketTimeout);

158

}

159

});

160

161

// Configure HTTP client

162

restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

163

@Override

164

public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

165

// Basic authentication

166

if (username != null && password != null && !username.isEmpty() && !password.isEmpty()) {

167

CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

168

credentialsProvider.setCredentials(

169

AuthScope.ANY,

170

new UsernamePasswordCredentials(username, password)

171

);

172

httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

173

}

174

175

// SSL configuration

176

if (sslEnabled) {

177

try {

178

SSLContext sslContext = SSLContextBuilder.create()

179

.loadTrustMaterial(TrustAllStrategy.INSTANCE)

180

.build();

181

httpClientBuilder.setSSLContext(sslContext);

182

httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);

183

} catch (Exception e) {

184

throw new RuntimeException("Failed to configure SSL", e);

185

}

186

}

187

188

// Connection pool configuration

189

httpClientBuilder.setMaxConnTotal(100);

190

httpClientBuilder.setMaxConnPerRoute(30);

191

192

return httpClientBuilder;

193

}

194

});

195

196

// Set max retry timeout

197

restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);

198

199

// Node selector for routing requests

200

restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);

201

}

202

}

203

204

// Usage

205

RestClientFactory comprehensiveFactory = new ComprehensiveRestClientFactory(

206

"elastic", // username

207

"secure_password", // password

208

null, // API key (null if using basic auth)

209

5000, // connect timeout (5s)

210

60000, // socket timeout (60s)

211

120000, // max retry timeout (2min)

212

true // SSL enabled

213

);

214

215

ElasticsearchSink<Event> configuredSink = new ElasticsearchSink.Builder<>(

216

httpHosts,

217

sinkFunction

218

)

219

.setRestClientFactory(comprehensiveFactory)

220

.build();

221

```

222

223

### Common Configuration Patterns

224

225

#### API Key Authentication

226

227

```java

228

RestClientFactory apiKeyFactory = restClientBuilder -> {

229

Header[] defaultHeaders = new Header[]{

230

new BasicHeader("Authorization", "ApiKey " + "your-api-key-here")

231

};

232

restClientBuilder.setDefaultHeaders(defaultHeaders);

233

};

234

```

235

236

#### Cloud Elasticsearch Configuration

237

238

```java

239

RestClientFactory cloudFactory = restClientBuilder -> {

240

// Cloud authentication

241

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {

242

CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

243

credentialsProvider.setCredentials(

244

AuthScope.ANY,

245

new UsernamePasswordCredentials("elastic", "cloud-password")

246

);

247

return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

248

});

249

250

// Cloud-specific timeouts

251

restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->

252

requestConfigBuilder

253

.setConnectTimeout(10000)

254

.setSocketTimeout(120000)

255

);

256

};

257

```

258

259

#### Development/Testing Configuration

260

261

```java

262

RestClientFactory devFactory = restClientBuilder -> {

263

// Relaxed timeouts for development

264

restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->

265

requestConfigBuilder

266

.setConnectTimeout(1000)

267

.setSocketTimeout(30000)

268

);

269

270

// Disable SSL verification for local testing

271

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {

272

try {

273

SSLContext sslContext = SSLContextBuilder.create()

274

.loadTrustMaterial(TrustAllStrategy.INSTANCE)

275

.build();

276

return httpClientBuilder

277

.setSSLContext(sslContext)

278

.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);

279

} catch (Exception e) {

280

throw new RuntimeException("SSL configuration failed", e);

281

}

282

});

283

};

284

```

285

286

#### Production High-Availability Configuration

287

288

```java

289

RestClientFactory productionFactory = restClientBuilder -> {

290

// Production timeouts

291

restClientBuilder.setRequestConfigCallback(requestConfigBuilder ->

292

requestConfigBuilder

293

.setConnectTimeout(5000)

294

.setSocketTimeout(60000)

295

);

296

297

// Connection pool optimization

298

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->

299

httpClientBuilder

300

.setMaxConnTotal(200)

301

.setMaxConnPerRoute(50)

302

.setKeepAliveStrategy((response, context) -> 30000) // 30 second keep-alive

303

);

304

305

// High retry timeout for resilience

306

restClientBuilder.setMaxRetryTimeoutMillis(180000); // 3 minutes

307

308

// Skip dedicated master nodes

309

restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);

310

};

311

```

312

313

### Configuration with Connection Pooling

314

315

```java

316

RestClientFactory pooledFactory = restClientBuilder -> {

317

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {

318

// Connection manager configuration

319

PoolingNHttpClientConnectionManager connectionManager =

320

new PoolingNHttpClientConnectionManager(

321

RegistryBuilder.<SchemeIOSessionStrategy>create()

322

.register("http", NoopIOSessionStrategy.INSTANCE)

323

.register("https", SSLIOSessionStrategy.getSystemDefaultStrategy())

324

.build()

325

);

326

327

connectionManager.setMaxTotal(150); // Total connections

328

connectionManager.setDefaultMaxPerRoute(50); // Per-route connections

329

connectionManager.setValidateAfterInactivity(30000); // Validate after 30s inactivity

330

331

return httpClientBuilder

332

.setConnectionManager(connectionManager)

333

.setConnectionManagerShared(false);

334

});

335

};

336

```

337

338

### Monitoring and Logging Configuration

339

340

```java

341

RestClientFactory monitoredFactory = restClientBuilder -> {

342

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {

343

// Add request/response interceptors for monitoring

344

httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {

345

@Override

346

public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {

347

LOG.debug("Elasticsearch request: {} {}", request.getRequestLine().getMethod(),

348

request.getRequestLine().getUri());

349

}

350

});

351

352

httpClientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {

353

@Override

354

public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {

355

LOG.debug("Elasticsearch response: {}", response.getStatusLine().getStatusCode());

356

}

357

});

358

359

return httpClientBuilder;

360

});

361

362

// Request logging

363

restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {

364

// Add custom request configuration for debugging

365

return requestConfigBuilder.setExpectContinueEnabled(true);

366

});

367

};

368

```

369

370

### Error Handling in Client Configuration

371

372

```java

373

RestClientFactory robustFactory = restClientBuilder -> {

374

restClientBuilder.setFailureListener(new RestClient.FailureListener() {

375

@Override

376

public void onFailure(Node node) {

377

LOG.warn("Elasticsearch node failed: {}", node.getHost());

378

}

379

});

380

381

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {

382

// Retry handler

383

httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true));

384

385

// Service unavailable retry strategy

386

httpClientBuilder.setServiceUnavailableRetryStrategy(

387

new ServiceUnavailableRetryStrategy() {

388

@Override

389

public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {

390

return executionCount <= 3 && response.getStatusLine().getStatusCode() == 503;

391

}

392

393

@Override

394

public long getRetryInterval() {

395

return 1000; // 1 second

396

}

397

}

398

);

399

400

return httpClientBuilder;

401

});

402

};

403

```