0
# DataStream API
1
2
Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.
3
4
## Capabilities
5
6
### ElasticsearchSink
7
8
Main sink class for sending streaming data to Elasticsearch 6.x clusters. Uses builder pattern for configuration and extends ElasticsearchSinkBase with RestHighLevelClient integration.
9
10
```java { .api }
11
/**
12
* Elasticsearch 6.x sink that requests multiple ActionRequests against a cluster
13
* for each incoming element.
14
* @param <T> Type of the elements handled by this sink
15
*/
16
@PublicEvolving
17
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
18
// Private constructor - use Builder pattern
19
}
20
```
21
22
### ElasticsearchSink.Builder
23
24
Builder pattern for creating ElasticsearchSink instances with full configuration control.
25
26
```java { .api }
27
/**
28
* A builder for creating an ElasticsearchSink.
29
* @param <T> Type of the elements handled by the sink this builder creates.
30
*/
31
@PublicEvolving
32
public static class ElasticsearchSink.Builder<T> {
33
/**
34
* Creates a new ElasticsearchSink that connects to the cluster using a RestHighLevelClient.
35
* @param httpHosts The list of HttpHost to which the RestHighLevelClient connects to.
36
* @param elasticsearchSinkFunction This is used to generate multiple ActionRequest from the incoming element.
37
*/
38
public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
39
40
/**
41
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it.
42
* @param numMaxActions the maximum number of actions to buffer per bulk request.
43
*/
44
public void setBulkFlushMaxActions(int numMaxActions);
45
46
/**
47
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it.
48
* @param maxSizeMb the maximum size of buffered actions, in mb.
49
*/
50
public void setBulkFlushMaxSizeMb(int maxSizeMb);
51
52
/**
53
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
54
* @param intervalMillis the bulk flush interval, in milliseconds.
55
*/
56
public void setBulkFlushInterval(long intervalMillis);
57
58
/**
59
* Sets whether or not to enable bulk flush backoff behaviour.
60
* @param enabled whether or not to enable backoffs.
61
*/
62
public void setBulkFlushBackoff(boolean enabled);
63
64
/**
65
* Sets the type of back off to use when flushing bulk requests.
66
* @param flushBackoffType the backoff type to use.
67
*/
68
public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType);
69
70
/**
71
* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
72
* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
73
*/
74
public void setBulkFlushBackoffRetries(int maxRetries);
75
76
/**
77
* Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
78
* @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
79
*/
80
public void setBulkFlushBackoffDelay(long delayMillis);
81
82
/**
83
* Sets a failure handler for action requests.
84
* @param failureHandler This is used to handle failed ActionRequest.
85
*/
86
public void setFailureHandler(ActionRequestFailureHandler failureHandler);
87
88
/**
89
* Sets a REST client factory for custom client configuration.
90
* @param restClientFactory the factory that configures the rest client.
91
*/
92
public void setRestClientFactory(RestClientFactory restClientFactory);
93
94
/**
95
* Creates the Elasticsearch sink.
96
* @return the created Elasticsearch sink.
97
*/
98
public ElasticsearchSink<T> build();
99
}
100
```
101
102
**Usage Examples:**
103
104
```java
105
import org.apache.flink.streaming.api.datastream.DataStream;
106
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
107
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
108
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
109
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
110
import org.apache.http.HttpHost;
111
import org.elasticsearch.action.index.IndexRequest;
112
import org.elasticsearch.client.Requests;
113
114
import java.util.ArrayList;
115
import java.util.HashMap;
116
import java.util.List;
117
import java.util.Map;
118
119
// Basic sink creation
120
List<HttpHost> httpHosts = new ArrayList<>();
121
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
122
httpHosts.add(new HttpHost("127.0.0.1", 9201, "http"));
123
124
ElasticsearchSinkFunction<MyData> sinkFunction = new ElasticsearchSinkFunction<MyData>() {
125
@Override
126
public void process(MyData element, RuntimeContext ctx, RequestIndexer indexer) {
127
Map<String, Object> json = new HashMap<>();
128
json.put("id", element.getId());
129
json.put("name", element.getName());
130
json.put("timestamp", element.getTimestamp());
131
132
IndexRequest request = Requests.indexRequest()
133
.index("my-index")
134
.type("_doc")
135
.id(String.valueOf(element.getId()))
136
.source(json);
137
138
indexer.add(request);
139
}
140
};
141
142
ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(
143
httpHosts,
144
sinkFunction
145
).build();
146
147
// Advanced configuration
148
ElasticsearchSink<MyData> advancedSink = new ElasticsearchSink.Builder<>(
149
httpHosts,
150
sinkFunction
151
)
152
.setBulkFlushMaxActions(1000) // Flush after 1000 actions
153
.setBulkFlushMaxSizeMb(5) // Or after 5MB
154
.setBulkFlushInterval(30000) // Or after 30 seconds
155
.setBulkFlushBackoff(true) // Enable backoff
156
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
157
.setBulkFlushBackoffRetries(3)
158
.setBulkFlushBackoffDelay(1000)
159
.setFailureHandler(new RetryRejectedExecutionFailureHandler())
160
.build();
161
162
// Add to DataStream
163
DataStream<MyData> dataStream = // ... your data stream
164
dataStream.addSink(advancedSink);
165
```
166
167
### ElasticsearchSinkFunction
168
169
User-defined function interface for converting stream elements into Elasticsearch ActionRequests.
170
171
```java { .api }
172
/**
173
* Creates multiple ActionRequests from an element in a stream.
174
* This is used by sinks to prepare elements for sending them to Elasticsearch.
175
* @param <T> The type of the element handled by this ElasticsearchSinkFunction
176
*/
177
@PublicEvolving
178
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
179
/**
180
* Initialization method for the function. It is called once before the actual working process methods.
181
*/
182
default void open() throws Exception {}
183
184
/**
185
* Tear-down method for the function. It is called when the sink closes.
186
*/
187
default void close() throws Exception {}
188
189
/**
190
* Process the incoming element to produce multiple ActionRequests.
191
* The produced requests should be added to the provided RequestIndexer.
192
* @param element incoming element to process
193
* @param ctx runtime context containing information about the sink instance
194
* @param indexer request indexer that ActionRequest should be added to
195
*/
196
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
197
}
198
```
199
200
**Usage Examples:**
201
202
```java
203
// Simple single-request function
204
ElasticsearchSinkFunction<String> simpleSinkFunction = new ElasticsearchSinkFunction<String>() {
205
@Override
206
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
207
Map<String, Object> json = new HashMap<>();
208
json.put("data", element);
209
json.put("timestamp", System.currentTimeMillis());
210
211
IndexRequest request = Requests.indexRequest()
212
.index("logs")
213
.type("_doc")
214
.source(json);
215
216
indexer.add(request);
217
}
218
};
219
220
// Multiple requests per element
221
ElasticsearchSinkFunction<Transaction> multiRequestFunction = new ElasticsearchSinkFunction<Transaction>() {
222
@Override
223
public void process(Transaction transaction, RuntimeContext ctx, RequestIndexer indexer) {
224
// Index the transaction
225
Map<String, Object> transactionDoc = new HashMap<>();
226
transactionDoc.put("id", transaction.getId());
227
transactionDoc.put("amount", transaction.getAmount());
228
transactionDoc.put("currency", transaction.getCurrency());
229
230
IndexRequest transactionRequest = Requests.indexRequest()
231
.index("transactions")
232
.type("_doc")
233
.id(transaction.getId())
234
.source(transactionDoc);
235
236
// Update user balance
237
Map<String, Object> balanceUpdate = new HashMap<>();
238
balanceUpdate.put("balance", transaction.getNewBalance());
239
balanceUpdate.put("last_updated", System.currentTimeMillis());
240
241
UpdateRequest balanceRequest = Requests.updateRequest()
242
.index("users")
243
.type("_doc")
244
.id(transaction.getUserId())
245
.doc(balanceUpdate);
246
247
indexer.add(transactionRequest, balanceRequest);
248
}
249
};
250
251
// With lifecycle methods
252
ElasticsearchSinkFunction<Event> lifecycleSinkFunction = new ElasticsearchSinkFunction<Event>() {
253
private ObjectMapper objectMapper;
254
255
@Override
256
public void open() throws Exception {
257
objectMapper = new ObjectMapper();
258
}
259
260
@Override
261
public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {
262
try {
263
String json = objectMapper.writeValueAsString(event);
264
Map<String, Object> source = objectMapper.readValue(json, HashMap.class);
265
266
IndexRequest request = Requests.indexRequest()
267
.index("events")
268
.type("_doc")
269
.source(source);
270
271
indexer.add(request);
272
} catch (Exception e) {
273
throw new RuntimeException("Failed to serialize event", e);
274
}
275
}
276
277
@Override
278
public void close() throws Exception {
279
// Cleanup resources if needed
280
}
281
};
282
```
283
284
### RequestIndexer
285
286
Interface for adding ActionRequests to be sent to Elasticsearch as part of bulk operations.
287
288
```java { .api }
289
/**
290
* Users add multiple delete, index or update requests to a RequestIndexer to prepare them
291
* for sending to an Elasticsearch cluster.
292
*/
293
@PublicEvolving
294
public interface RequestIndexer {
295
/**
296
* Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.
297
* @param deleteRequests The multiple DeleteRequest to add.
298
*/
299
void add(DeleteRequest... deleteRequests);
300
301
/**
302
* Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.
303
* @param indexRequests The multiple IndexRequest to add.
304
*/
305
void add(IndexRequest... indexRequests);
306
307
/**
308
* Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.
309
* @param updateRequests The multiple UpdateRequest to add.
310
*/
311
void add(UpdateRequest... updateRequests);
312
313
/**
314
* Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.
315
* @param actionRequests The multiple ActionRequest to add.
316
* @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods
317
*/
318
@Deprecated
319
default void add(ActionRequest... actionRequests);
320
}
321
```