or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-elasticsearch6_2-12

Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-elasticsearch6_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch6_2-12@1.14.0

0

# Apache Flink Elasticsearch 6 Connector

1

2

Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality for real-time data ingestion into Elasticsearch clusters. It supports both DataStream API for programmatic streaming jobs and Table API for SQL-based stream processing, with comprehensive configuration options for connection management, bulk processing behavior, and retry mechanisms.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-elasticsearch6_2.12

7

- **Package Type**: maven

8

- **Language**: Java (with Scala 2.12 support)

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-connector-elasticsearch6_2.12

11

- **Installation**: Add to Maven dependencies with version 1.14.6

12

13

## Core Imports

14

15

```java

16

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;

17

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;

18

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;

19

import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;

20

import org.apache.http.HttpHost;

21

```

22

23

For Table API usage:

24

25

```java

26

// Configuration via DDL

27

CREATE TABLE elasticsearch_sink (

28

id BIGINT,

29

name STRING,

30

age INT

31

) WITH (

32

'connector' = 'elasticsearch-6',

33

'hosts' = 'http://localhost:9200',

34

'index' = 'users',

35

'document-type' = '_doc'

36

);

37

```

38

39

## Basic Usage

40

41

```java

42

import org.apache.flink.streaming.api.datastream.DataStream;

43

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;

44

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;

45

import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

46

import org.apache.http.HttpHost;

47

import org.elasticsearch.action.index.IndexRequest;

48

import org.elasticsearch.client.Requests;

49

50

import java.util.ArrayList;

51

import java.util.HashMap;

52

import java.util.List;

53

import java.util.Map;

54

55

// Create HTTP hosts list

56

List<HttpHost> httpHosts = new ArrayList<>();

57

httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

58

59

// Define sink function

60

ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {

61

public IndexRequest createIndexRequest(String element) {

62

Map<String, Object> json = new HashMap<>();

63

json.put("data", element);

64

65

return Requests.indexRequest()

66

.index("my-index")

67

.type("_doc")

68

.source(json);

69

}

70

71

@Override

72

public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {

73

indexer.add(createIndexRequest(element));

74

}

75

};

76

77

// Create sink

78

ElasticsearchSink<String> sink = new ElasticsearchSink.Builder<>(

79

httpHosts,

80

sinkFunction

81

).build();

82

83

// Add sink to DataStream

84

DataStream<String> input = // ... your data stream

85

input.addSink(sink);

86

```

87

88

## Architecture

89

90

The Flink Elasticsearch 6 connector is built around several key components:

91

92

- **ElasticsearchSink**: Main sink class that extends ElasticsearchSinkBase and uses RestHighLevelClient

93

- **Builder Pattern**: Fluent API for configuring bulk processing, failure handling, and client settings

94

- **Bulk Processing**: Internal BulkProcessor buffers multiple ActionRequests before sending to cluster

95

- **Failure Handling**: Pluggable ActionRequestFailureHandler system for custom error recovery strategies

96

- **Table API Integration**: Dynamic table sink factory for SQL-based stream processing

97

- **REST Client**: Uses Elasticsearch REST High Level Client with configurable connection settings

98

99

## Capabilities

100

101

### DataStream API Integration

102

103

Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.

104

105

```java { .api }

106

public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {

107

// Private constructor - use Builder

108

}

109

110

public static class ElasticsearchSink.Builder<T> {

111

public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);

112

public ElasticsearchSink<T> build();

113

}

114

```

115

116

[DataStream API](./datastream-api.md)

117

118

### Table API Integration

119

120

SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.

121

122

```java { .api }

123

// Table API usage via DDL

124

CREATE TABLE sink_table (...) WITH (

125

'connector' = 'elasticsearch-6',

126

'hosts' = 'http://localhost:9200',

127

'index' = 'target-index',

128

'document-type' = '_doc'

129

);

130

```

131

132

[Table API](./table-api.md)

133

134

### Bulk Processing Configuration

135

136

Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.

137

138

```java { .api }

139

// Builder methods for bulk configuration

140

public void setBulkFlushMaxActions(int numMaxActions);

141

public void setBulkFlushMaxSizeMb(int maxSizeMb);

142

public void setBulkFlushInterval(long intervalMillis);

143

```

144

145

[Bulk Processing](./bulk-processing.md)

146

147

### Failure Handling

148

149

Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.

150

151

```java { .api }

152

public interface ActionRequestFailureHandler extends Serializable {

153

void onFailure(

154

ActionRequest action,

155

Throwable failure,

156

int restStatusCode,

157

RequestIndexer indexer

158

) throws Throwable;

159

}

160

```

161

162

[Failure Handling](./failure-handling.md)

163

164

### Client Configuration

165

166

REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.

167

168

```java { .api }

169

public interface RestClientFactory extends Serializable {

170

void configureRestClientBuilder(RestClientBuilder restClientBuilder);

171

}

172

```

173

174

[Client Configuration](./client-configuration.md)

175

176

## Types

177

178

```java { .api }

179

// Core functional interface for processing stream elements

180

public interface ElasticsearchSinkFunction<T> extends Serializable, Function {

181

default void open() throws Exception {}

182

default void close() throws Exception {}

183

void process(T element, RuntimeContext ctx, RequestIndexer indexer);

184

}

185

186

// Request indexer for adding ActionRequests

187

public interface RequestIndexer {

188

void add(DeleteRequest... deleteRequests);

189

void add(IndexRequest... indexRequests);

190

void add(UpdateRequest... updateRequests);

191

}

192

193

// Backoff configuration

194

public enum FlushBackoffType {

195

CONSTANT,

196

EXPONENTIAL

197

}

198

199

// Backoff configuration policy for bulk processing

200

public static class BulkFlushBackoffPolicy implements Serializable {

201

/**

202

* Get the backoff type (CONSTANT or EXPONENTIAL).

203

* @return the backoff type

204

*/

205

public FlushBackoffType getBackoffType();

206

207

/**

208

* Get the maximum number of retry attempts.

209

* @return the maximum retry count

210

*/

211

public int getMaxRetryCount();

212

213

/**

214

* Get the initial delay in milliseconds.

215

* @return the delay in milliseconds

216

*/

217

public long getDelayMillis();

218

219

/**

220

* Set the backoff type.

221

* @param backoffType the backoff type to use

222

*/

223

public void setBackoffType(FlushBackoffType backoffType);

224

225

/**

226

* Set the maximum number of retry attempts.

227

* @param maxRetryCount the maximum retry count (must be >= 0)

228

*/

229

public void setMaxRetryCount(int maxRetryCount);

230

231

/**

232

* Set the initial delay between retry attempts.

233

* @param delayMillis the delay in milliseconds (must be >= 0)

234

*/

235

public void setDelayMillis(long delayMillis);

236

}

237

```