0
# Apache Flink Elasticsearch 6 Connector
1
2
Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality for real-time data ingestion into Elasticsearch clusters. It supports both DataStream API for programmatic streaming jobs and Table API for SQL-based stream processing, with comprehensive configuration options for connection management, bulk processing behavior, and retry mechanisms.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-elasticsearch6_2.12
7
- **Package Type**: maven
8
- **Language**: Java (with Scala 2.12 support)
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-connector-elasticsearch6_2.12
11
- **Installation**: Add to Maven dependencies with version 1.14.6
12
13
## Core Imports
14
15
```java
16
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
17
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
18
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
19
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
20
import org.apache.http.HttpHost;
21
```
22
23
For Table API usage:
24
25
```java
26
// Configuration via DDL
27
CREATE TABLE elasticsearch_sink (
28
id BIGINT,
29
name STRING,
30
age INT
31
) WITH (
32
'connector' = 'elasticsearch-6',
33
'hosts' = 'http://localhost:9200',
34
'index' = 'users',
35
'document-type' = '_doc'
36
);
37
```
38
39
## Basic Usage
40
41
```java
42
import org.apache.flink.streaming.api.datastream.DataStream;
43
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
44
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
45
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
46
import org.apache.http.HttpHost;
47
import org.elasticsearch.action.index.IndexRequest;
48
import org.elasticsearch.client.Requests;
49
50
import java.util.ArrayList;
51
import java.util.HashMap;
52
import java.util.List;
53
import java.util.Map;
54
55
// Create HTTP hosts list
56
List<HttpHost> httpHosts = new ArrayList<>();
57
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
58
59
// Define sink function
60
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
61
public IndexRequest createIndexRequest(String element) {
62
Map<String, Object> json = new HashMap<>();
63
json.put("data", element);
64
65
return Requests.indexRequest()
66
.index("my-index")
67
.type("_doc")
68
.source(json);
69
}
70
71
@Override
72
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
73
indexer.add(createIndexRequest(element));
74
}
75
};
76
77
// Create sink
78
ElasticsearchSink<String> sink = new ElasticsearchSink.Builder<>(
79
httpHosts,
80
sinkFunction
81
).build();
82
83
// Add sink to DataStream
84
DataStream<String> input = // ... your data stream
85
input.addSink(sink);
86
```
87
88
## Architecture
89
90
The Flink Elasticsearch 6 connector is built around several key components:
91
92
- **ElasticsearchSink**: Main sink class that extends ElasticsearchSinkBase and uses RestHighLevelClient
93
- **Builder Pattern**: Fluent API for configuring bulk processing, failure handling, and client settings
94
- **Bulk Processing**: Internal BulkProcessor buffers multiple ActionRequests before sending to cluster
95
- **Failure Handling**: Pluggable ActionRequestFailureHandler system for custom error recovery strategies
96
- **Table API Integration**: Dynamic table sink factory for SQL-based stream processing
97
- **REST Client**: Uses Elasticsearch REST High Level Client with configurable connection settings
98
99
## Capabilities
100
101
### DataStream API Integration
102
103
Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.
104
105
```java { .api }
106
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
107
// Private constructor - use Builder
108
}
109
110
public static class ElasticsearchSink.Builder<T> {
111
public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
112
public ElasticsearchSink<T> build();
113
}
114
```
115
116
[DataStream API](./datastream-api.md)
117
118
### Table API Integration
119
120
SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.
121
122
```java { .api }
123
// Table API usage via DDL
124
CREATE TABLE sink_table (...) WITH (
125
'connector' = 'elasticsearch-6',
126
'hosts' = 'http://localhost:9200',
127
'index' = 'target-index',
128
'document-type' = '_doc'
129
);
130
```
131
132
[Table API](./table-api.md)
133
134
### Bulk Processing Configuration
135
136
Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.
137
138
```java { .api }
139
// Builder methods for bulk configuration
140
public void setBulkFlushMaxActions(int numMaxActions);
141
public void setBulkFlushMaxSizeMb(int maxSizeMb);
142
public void setBulkFlushInterval(long intervalMillis);
143
```
144
145
[Bulk Processing](./bulk-processing.md)
146
147
### Failure Handling
148
149
Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.
150
151
```java { .api }
152
public interface ActionRequestFailureHandler extends Serializable {
153
void onFailure(
154
ActionRequest action,
155
Throwable failure,
156
int restStatusCode,
157
RequestIndexer indexer
158
) throws Throwable;
159
}
160
```
161
162
[Failure Handling](./failure-handling.md)
163
164
### Client Configuration
165
166
REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.
167
168
```java { .api }
169
public interface RestClientFactory extends Serializable {
170
void configureRestClientBuilder(RestClientBuilder restClientBuilder);
171
}
172
```
173
174
[Client Configuration](./client-configuration.md)
175
176
## Types
177
178
```java { .api }
179
// Core functional interface for processing stream elements
180
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
181
default void open() throws Exception {}
182
default void close() throws Exception {}
183
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
184
}
185
186
// Request indexer for adding ActionRequests
187
public interface RequestIndexer {
188
void add(DeleteRequest... deleteRequests);
189
void add(IndexRequest... indexRequests);
190
void add(UpdateRequest... updateRequests);
191
}
192
193
// Backoff configuration
194
public enum FlushBackoffType {
195
CONSTANT,
196
EXPONENTIAL
197
}
198
199
// Backoff configuration policy for bulk processing
200
public static class BulkFlushBackoffPolicy implements Serializable {
201
/**
202
* Get the backoff type (CONSTANT or EXPONENTIAL).
203
* @return the backoff type
204
*/
205
public FlushBackoffType getBackoffType();
206
207
/**
208
* Get the maximum number of retry attempts.
209
* @return the maximum retry count
210
*/
211
public int getMaxRetryCount();
212
213
/**
214
* Get the initial delay in milliseconds.
215
* @return the delay in milliseconds
216
*/
217
public long getDelayMillis();
218
219
/**
220
* Set the backoff type.
221
* @param backoffType the backoff type to use
222
*/
223
public void setBackoffType(FlushBackoffType backoffType);
224
225
/**
226
* Set the maximum number of retry attempts.
227
* @param maxRetryCount the maximum retry count (must be >= 0)
228
*/
229
public void setMaxRetryCount(int maxRetryCount);
230
231
/**
232
* Set the initial delay between retry attempts.
233
* @param delayMillis the delay in milliseconds (must be >= 0)
234
*/
235
public void setDelayMillis(long delayMillis);
236
}
237
```