Apache Flink connector for Elasticsearch 1.x that enables streaming data ingestion into Elasticsearch clusters
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch-2-12@1.8.00
# Flink Elasticsearch Connector
1
2
The Apache Flink Elasticsearch connector enables streaming data ingestion from Flink data streams into **Elasticsearch 1.x clusters only**. It provides fault-tolerant, exactly-once processing guarantees with configurable bulk processing and flexible failure handling strategies.
3
4
**Important**: This connector is specifically designed for Elasticsearch 1.x and is not compatible with newer versions of Elasticsearch.
5
6
## Package Information
7
8
- **Package Name**: flink-connector-elasticsearch_2.12
9
- **Package Type**: maven
10
- **Language**: Java
11
- **Elasticsearch Version**: 1.x (specifically 1.7.1)
12
- **Installation**:
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-connector-elasticsearch_2.12</artifactId>
17
<version>1.8.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
25
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
26
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
27
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
28
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
29
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
30
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
31
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
32
```
33
34
## Basic Usage
35
36
```java
37
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
38
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
39
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
40
import org.apache.flink.api.common.functions.RuntimeContext;
41
import org.elasticsearch.action.index.IndexRequest;
42
import org.elasticsearch.client.Requests;
43
44
import java.util.HashMap;
45
import java.util.Map;
46
47
// Configure Elasticsearch connection using configuration constants
48
Map<String, String> config = new HashMap<>();
49
config.put("cluster.name", "my-cluster");
50
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
51
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");
52
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "5000");
53
54
// Create sink function
55
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
56
@Override
57
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
58
Map<String, Object> json = new HashMap<>();
59
json.put("data", element);
60
json.put("timestamp", System.currentTimeMillis());
61
62
IndexRequest request = Requests.indexRequest()
63
.index("my-index")
64
.type("my-type")
65
.source(json);
66
67
indexer.add(request);
68
}
69
};
70
71
// Create and add sink to stream
72
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);
73
dataStream.addSink(sink);
74
```
75
76
## Architecture
77
78
The connector is built around several key components:
79
80
- **ElasticsearchSink**: Main sink implementation that connects to Elasticsearch clusters
81
- **Connection Modes**: Supports both embedded Node and TransportClient connection modes
82
- **Bulk Processing**: Uses Elasticsearch's BulkProcessor for efficient batch operations
83
- **Failure Handling**: Configurable strategies for handling failed requests
84
- **Type Safety**: Generic type support for processing different data types
85
- **Version-Specific Bridge**: Uses Elasticsearch1ApiCallBridge for 1.x compatibility
86
87
## Capabilities
88
89
### Elasticsearch Sink
90
91
Main sink implementation for connecting Flink streams to Elasticsearch 1.x clusters.
92
93
```java { .api }
94
/**
95
* Elasticsearch 1.x sink that requests multiple ActionRequests against a cluster for each incoming element.
96
* @param <T> Type of the elements handled by this sink
97
*/
98
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
99
100
/**
101
* Creates a new ElasticsearchSink that connects using an embedded Node (deprecated constructor).
102
* @param userConfig The map of user settings for constructing the Node and BulkProcessor
103
* @param indexRequestBuilder Function to generate IndexRequest from incoming elements
104
* @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.
105
*/
106
@Deprecated
107
public ElasticsearchSink(
108
Map<String, String> userConfig,
109
IndexRequestBuilder<T> indexRequestBuilder
110
);
111
112
/**
113
* Creates a new ElasticsearchSink that connects using a TransportClient (deprecated constructor).
114
* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor
115
* @param transportAddresses The addresses of Elasticsearch nodes to connect to
116
* @param indexRequestBuilder Function to generate IndexRequest from incoming elements
117
* @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.
118
*/
119
@Deprecated
120
public ElasticsearchSink(
121
Map<String, String> userConfig,
122
List<TransportAddress> transportAddresses,
123
IndexRequestBuilder<T> indexRequestBuilder
124
);
125
126
/**
127
* Creates a new ElasticsearchSink that connects using an embedded Node.
128
* The sink will block and wait for a cluster to come online.
129
* @param userConfig The map of user settings for constructing the embedded Node and BulkProcessor.
130
* Important settings include "cluster.name" and bulk processing configuration.
131
* @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements
132
*/
133
public ElasticsearchSink(
134
Map<String, String> userConfig,
135
ElasticsearchSinkFunction<T> elasticsearchSinkFunction
136
);
137
138
/**
139
* Creates a new ElasticsearchSink that connects using a TransportClient.
140
* The sink will fail if no cluster can be connected to.
141
* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor.
142
* Important settings include "cluster.name" and bulk processing configuration.
143
* @param transportAddresses The addresses of Elasticsearch nodes to connect to using a TransportClient
144
* @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements
145
*/
146
public ElasticsearchSink(
147
Map<String, String> userConfig,
148
List<TransportAddress> transportAddresses,
149
ElasticsearchSinkFunction<T> elasticsearchSinkFunction
150
);
151
152
/**
153
* Creates a new ElasticsearchSink with custom failure handling using an embedded Node.
154
* @param userConfig The map of user settings for constructing the Node and BulkProcessor
155
* @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements
156
* @param failureHandler Handler for failed ActionRequests
157
*/
158
public ElasticsearchSink(
159
Map<String, String> userConfig,
160
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
161
ActionRequestFailureHandler failureHandler
162
);
163
164
/**
165
* Creates a new ElasticsearchSink with custom failure handling using a TransportClient.
166
* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor
167
* @param transportAddresses The addresses of Elasticsearch nodes to connect to
168
* @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements
169
* @param failureHandler Handler for failed ActionRequests
170
*/
171
public ElasticsearchSink(
172
Map<String, String> userConfig,
173
List<TransportAddress> transportAddresses,
174
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
175
ActionRequestFailureHandler failureHandler
176
);
177
}
178
```
179
180
### Elasticsearch Sink Function
181
182
Interface for processing stream elements into Elasticsearch action requests.
183
184
```java { .api }
185
/**
186
* Creates multiple ActionRequests from an element in a stream.
187
* @param <T> The type of the element handled by this ElasticsearchSinkFunction
188
*/
189
@PublicEvolving
190
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
191
192
/**
193
* Process the incoming element to produce multiple ActionRequests.
194
* The produced requests should be added to the provided RequestIndexer.
195
* @param element incoming element to process
196
* @param ctx runtime context containing information about the sink instance
197
* @param indexer request indexer that ActionRequest should be added to
198
*/
199
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
200
}
201
```
202
203
### Request Indexer
204
205
Interface for adding requests to be sent to Elasticsearch.
206
207
```java { .api }
208
/**
209
* Users add multiple delete, index or update requests to a RequestIndexer to prepare
210
* them for sending to an Elasticsearch cluster.
211
*/
212
@PublicEvolving
213
public interface RequestIndexer {
214
215
/**
216
* Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.
217
* @param deleteRequests The multiple DeleteRequest to add.
218
*/
219
void add(DeleteRequest... deleteRequests);
220
221
/**
222
* Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.
223
* @param indexRequests The multiple IndexRequest to add.
224
*/
225
void add(IndexRequest... indexRequests);
226
227
/**
228
* Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.
229
* @param updateRequests The multiple UpdateRequest to add.
230
*/
231
void add(UpdateRequest... updateRequests);
232
233
/**
234
* Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.
235
* @param actionRequests The multiple ActionRequest to add.
236
* @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods
237
*/
238
@Deprecated
239
default void add(ActionRequest... actionRequests) {
240
for (ActionRequest actionRequest : actionRequests) {
241
if (actionRequest instanceof IndexRequest) {
242
add((IndexRequest) actionRequest);
243
} else if (actionRequest instanceof DeleteRequest) {
244
add((DeleteRequest) actionRequest);
245
} else if (actionRequest instanceof UpdateRequest) {
246
add((UpdateRequest) actionRequest);
247
} else {
248
throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");
249
}
250
}
251
}
252
}
253
```
254
255
### Action Request Failure Handler
256
257
Interface defining how failed ActionRequests should be handled.
258
259
```java { .api }
260
/**
261
* An implementation of ActionRequestFailureHandler is provided by the user to define how failed
262
* ActionRequests should be handled, e.g. dropping them, reprocessing malformed documents, or
263
* simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
264
*/
265
@PublicEvolving
266
public interface ActionRequestFailureHandler extends Serializable {
267
268
/**
269
* Handle a failed ActionRequest.
270
* @param action the ActionRequest that failed due to the failure
271
* @param failure the cause of failure
272
* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
273
* @param indexer request indexer to re-add the failed action, if intended to do so
274
* @throws Throwable if the sink should fail on this failure, the implementation should rethrow
275
* the exception or a custom one
276
*/
277
void onFailure(
278
ActionRequest action,
279
Throwable failure,
280
int restStatusCode,
281
RequestIndexer indexer
282
) throws Throwable;
283
}
284
```
285
286
## Failure Handler Implementations
287
288
### NoOpFailureHandler
289
290
Default failure handler that simply rethrows failures, causing the sink to fail.
291
292
```java { .api }
293
/**
294
* An ActionRequestFailureHandler that simply fails the sink on any failures.
295
*/
296
@Internal
297
public class NoOpFailureHandler implements ActionRequestFailureHandler {
298
299
@Override
300
public void onFailure(
301
ActionRequest action,
302
Throwable failure,
303
int restStatusCode,
304
RequestIndexer indexer
305
) throws Throwable {
306
// simply fail the sink
307
throw failure;
308
}
309
}
310
```
311
312
### IgnoringFailureHandler
313
314
Failure handler that ignores all failures and drops affected requests.
315
316
```java { .api }
317
/**
318
* Ignores all kinds of failures and drops the affected ActionRequest.
319
*/
320
@Internal
321
public class IgnoringFailureHandler implements ActionRequestFailureHandler {
322
323
@Override
324
public void onFailure(
325
ActionRequest action,
326
Throwable failure,
327
int restStatusCode,
328
RequestIndexer indexer
329
) {
330
// ignore failure
331
}
332
}
333
```
334
335
### RetryRejectedExecutionFailureHandler
336
337
Failure handler that retries requests failed due to temporary EsRejectedExecutionException.
338
339
```java { .api }
340
/**
341
* An ActionRequestFailureHandler that re-adds requests that failed due to temporary
342
* EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),
343
* and fails for all other failures.
344
*/
345
@PublicEvolving
346
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
347
348
@Override
349
public void onFailure(
350
ActionRequest action,
351
Throwable failure,
352
int restStatusCode,
353
RequestIndexer indexer
354
) throws Throwable {
355
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
356
indexer.add(action);
357
} else {
358
// rethrow all other failures
359
throw failure;
360
}
361
}
362
}
363
```
364
365
## Configuration Constants
366
367
### Configuration Keys
368
369
The following constants define configuration keys used by the connector:
370
371
```java { .api }
372
// From ElasticsearchSinkBase class
373
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
374
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
375
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
376
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
377
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
378
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
379
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
380
```
381
382
### Configuration Options
383
384
#### Bulk Processing Configuration
385
386
- `bulk.flush.max.actions`: Maximum amount of elements to buffer (no default - uses Elasticsearch BulkProcessor defaults)
387
- `bulk.flush.max.size.mb`: Maximum amount of data (in megabytes) to buffer (no default - uses Elasticsearch BulkProcessor defaults)
388
- `bulk.flush.interval.ms`: Interval at which to flush data regardless of other settings (no default - uses Elasticsearch BulkProcessor defaults)
389
390
#### Backoff Configuration
391
392
- `bulk.flush.backoff.enable`: Enable backoff retries (default: true)
393
- `bulk.flush.backoff.type`: Backoff type - "CONSTANT" or "EXPONENTIAL" (default: EXPONENTIAL)
394
- `bulk.flush.backoff.retries`: Maximum retry count (default: 8)
395
- `bulk.flush.backoff.delay`: Delay in milliseconds (default: 50)
396
397
**Important Note**: Elasticsearch 1.x does not support backoff retries. The BulkProcessor in Elasticsearch 1.x does not provide backoff retry functionality, so these configuration options are parsed but have no effect and will log a warning when specified.
398
399
#### Cluster Configuration
400
401
- `cluster.name`: Name of the Elasticsearch cluster to connect to
402
- `http.enabled`: HTTP access setting (automatically set to false for embedded nodes)
403
404
## Deprecated Interfaces
405
406
### IndexRequestBuilder
407
408
Creates IndexRequest objects from stream elements. This interface is deprecated in favor of ElasticsearchSinkFunction.
409
410
```java { .api }
411
/**
412
* Function that creates an IndexRequest from an element in a Stream.
413
* This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.
414
* @param <T> The type of the element handled by this IndexRequestBuilder
415
* @deprecated Deprecated since version 1.2, to be removed at version 2.0.
416
* Please create a ElasticsearchSink using a ElasticsearchSinkFunction instead.
417
*/
418
@Deprecated
419
public interface IndexRequestBuilder<T> extends Function, Serializable {
420
421
/**
422
* Creates an IndexRequest from an element.
423
* @param element The element that needs to be turned in to an IndexRequest
424
* @param ctx The Flink RuntimeContext of the ElasticsearchSink
425
* @return The constructed IndexRequest
426
*/
427
IndexRequest createIndexRequest(T element, RuntimeContext ctx);
428
}
429
```
430
431
## Enums and Types
432
433
### FlushBackoffType
434
435
Backoff strategy for bulk flush retries.
436
437
```java { .api }
438
/**
439
* Backoff strategy types for bulk flush retries.
440
*/
441
public enum FlushBackoffType {
442
/** Constant delay between retries */
443
CONSTANT,
444
/** Exponentially increasing delay between retries */
445
EXPONENTIAL
446
}
447
```
448
449
### BulkFlushBackoffPolicy
450
451
Configuration for bulk flush backoff behavior.
452
453
```java { .api }
454
/**
455
* Configuration for bulk flush backoff behavior.
456
*/
457
public static class BulkFlushBackoffPolicy implements Serializable {
458
/** The backoff type (CONSTANT or EXPONENTIAL) */
459
private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
460
461
/** Maximum number of retry attempts */
462
private int maxRetryCount = 8;
463
464
/** Delay in milliseconds between retries */
465
private long delayMillis = 50L;
466
467
// Constructor and getter/setter methods...
468
}
469
```
470
471
### Exception Types
472
473
```java { .api }
474
// Exception thrown when Elasticsearch queues are full
475
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
476
477
// Flink utility for exception handling
478
import org.apache.flink.util.ExceptionUtils;
479
```
480
481
### Core Types
482
483
```java { .api }
484
// Elasticsearch client types from elasticsearch 1.x dependency (version 1.7.1)
485
import org.elasticsearch.action.ActionRequest;
486
import org.elasticsearch.action.delete.DeleteRequest;
487
import org.elasticsearch.action.index.IndexRequest;
488
import org.elasticsearch.action.update.UpdateRequest;
489
import org.elasticsearch.action.bulk.BulkProcessor;
490
import org.elasticsearch.action.bulk.BulkRequest;
491
import org.elasticsearch.action.bulk.BulkResponse;
492
import org.elasticsearch.action.bulk.BulkItemResponse;
493
import org.elasticsearch.client.Client;
494
import org.elasticsearch.client.Requests;
495
import org.elasticsearch.client.transport.TransportClient;
496
import org.elasticsearch.common.transport.TransportAddress;
497
import org.elasticsearch.common.transport.InetSocketTransportAddress;
498
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
499
import org.elasticsearch.node.Node;
500
import org.elasticsearch.rest.RestStatus;
501
502
// Flink runtime types
503
import org.apache.flink.api.common.functions.RuntimeContext;
504
import org.apache.flink.api.common.functions.Function;
505
import org.apache.flink.annotation.PublicEvolving;
506
import org.apache.flink.annotation.Internal;
507
import org.apache.flink.util.ExceptionUtils;
508
509
// Java standard types
510
import java.io.Serializable;
511
import java.util.List;
512
import java.util.Map;
513
import java.util.HashMap;
514
import java.util.Arrays;
515
```
516
517
## Connection Modes
518
519
### Embedded Node Mode
520
521
Uses a local Elasticsearch Node for communication. The sink will block and wait for a cluster to come online.
522
523
```java
524
// Configuration for embedded node
525
Map<String, String> config = new HashMap<>();
526
config.put("cluster.name", "my-cluster");
527
528
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);
529
```
530
531
### TransportClient Mode
532
533
Uses a TransportClient with specified addresses. The sink will fail if no cluster connection can be established.
534
535
```java
536
import org.elasticsearch.common.transport.InetSocketTransportAddress;
537
import org.elasticsearch.common.transport.TransportAddress;
538
539
// Configuration for transport client
540
Map<String, String> config = new HashMap<>();
541
config.put("cluster.name", "my-cluster");
542
543
List<TransportAddress> transportAddresses = Arrays.asList(
544
new InetSocketTransportAddress("localhost", 9300),
545
new InetSocketTransportAddress("localhost", 9301)
546
);
547
548
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, transportAddresses, sinkFunction);
549
```
550
551
## Error Handling
552
553
The connector provides flexible error handling through the ActionRequestFailureHandler interface:
554
555
```java
556
// Custom failure handler example
557
ActionRequestFailureHandler customHandler = new ActionRequestFailureHandler() {
558
@Override
559
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
560
if (restStatusCode == 429) {
561
// Retry on rate limiting
562
indexer.add(action);
563
} else if (restStatusCode >= 400 && restStatusCode < 500) {
564
// Drop malformed requests (client errors)
565
System.err.println("Dropping malformed request: " + failure.getMessage());
566
} else {
567
// Fail on other errors
568
throw failure;
569
}
570
}
571
};
572
573
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction, customHandler);
574
```
575
576
## Usage Examples
577
578
### Basic String Indexing
579
580
```java
581
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
582
@Override
583
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
584
Map<String, Object> json = new HashMap<>();
585
json.put("message", element);
586
json.put("timestamp", System.currentTimeMillis());
587
588
IndexRequest request = Requests.indexRequest()
589
.index("logs")
590
.type("message")
591
.source(json);
592
593
indexer.add(request);
594
}
595
};
596
```
597
598
### Complex Object Processing
599
600
```java
601
public class User {
602
public String name;
603
public int age;
604
public String email;
605
// constructors, getters, setters...
606
}
607
608
ElasticsearchSinkFunction<User> userSinkFunction = new ElasticsearchSinkFunction<User>() {
609
@Override
610
public void process(User user, RuntimeContext ctx, RequestIndexer indexer) {
611
Map<String, Object> json = new HashMap<>();
612
json.put("name", user.name);
613
json.put("age", user.age);
614
json.put("email", user.email);
615
json.put("indexed_at", System.currentTimeMillis());
616
617
// Index request
618
IndexRequest indexRequest = Requests.indexRequest()
619
.index("users")
620
.type("user")
621
.id(user.email) // Use email as document ID
622
.source(json);
623
624
indexer.add(indexRequest);
625
626
// Optional: Also create update request for upsert behavior
627
UpdateRequest updateRequest = Requests.updateRequest()
628
.index("users")
629
.type("user")
630
.id(user.email)
631
.doc(json)
632
.upsert(json);
633
634
indexer.add(updateRequest);
635
}
636
};
637
```
638
639
### Using Configuration Constants
640
641
```java
642
// Recommended approach using configuration constants
643
Map<String, String> config = new HashMap<>();
644
config.put("cluster.name", "production-cluster");
645
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
646
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "10");
647
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "30000");
648
649
// Note: Backoff configurations will be ignored in Elasticsearch 1.x but can be set
650
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");
651
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
652
653
ElasticsearchSink<MyEvent> sink = new ElasticsearchSink<>(
654
config,
655
mySinkFunction,
656
new RetryRejectedExecutionFailureHandler()
657
);
658
```
659
660
### Multiple Index Operations
661
662
```java
663
ElasticsearchSinkFunction<Event> eventSinkFunction = new ElasticsearchSinkFunction<Event>() {
664
@Override
665
public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {
666
// Index to main events index
667
Map<String, Object> eventJson = new HashMap<>();
668
eventJson.put("type", event.getType());
669
eventJson.put("data", event.getData());
670
eventJson.put("timestamp", event.getTimestamp());
671
672
IndexRequest eventRequest = Requests.indexRequest()
673
.index("events")
674
.type("event")
675
.source(eventJson);
676
indexer.add(eventRequest);
677
678
// Also index to daily partitioned index
679
String dailyIndex = "events-" + event.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
680
IndexRequest dailyRequest = Requests.indexRequest()
681
.index(dailyIndex)
682
.type("event")
683
.source(eventJson);
684
indexer.add(dailyRequest);
685
}
686
};
687
```