Apache Flink connector for Elasticsearch 1.x that enables streaming data ingestion into Elasticsearch clusters
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch-2-12@1.8.0The Apache Flink Elasticsearch connector enables streaming data ingestion from Flink data streams into Elasticsearch 1.x clusters only. It provides fault-tolerant, exactly-once processing guarantees with configurable bulk processing and flexible failure handling strategies.
Important: This connector is specifically designed for Elasticsearch 1.x and is not compatible with newer versions of Elasticsearch.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch_2.12</artifactId>
<version>1.8.3</version>
</dependency>import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.HashMap;
import java.util.Map;
// Configure Elasticsearch connection using configuration constants
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "5000");
// Create sink function
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
json.put("timestamp", System.currentTimeMillis());
IndexRequest request = Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
indexer.add(request);
}
};
// Create and add sink to stream
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);
dataStream.addSink(sink);The connector is built around several key components:
Main sink implementation for connecting Flink streams to Elasticsearch 1.x clusters.
/**
* Elasticsearch 1.x sink that requests multiple ActionRequests against a cluster for each incoming element.
* @param <T> Type of the elements handled by this sink
*/
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
/**
* Creates a new ElasticsearchSink that connects using an embedded Node (deprecated constructor).
* @param userConfig The map of user settings for constructing the Node and BulkProcessor
* @param indexRequestBuilder Function to generate IndexRequest from incoming elements
* @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.
*/
@Deprecated
public ElasticsearchSink(
Map<String, String> userConfig,
IndexRequestBuilder<T> indexRequestBuilder
);
/**
* Creates a new ElasticsearchSink that connects using a TransportClient (deprecated constructor).
* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor
* @param transportAddresses The addresses of Elasticsearch nodes to connect to
* @param indexRequestBuilder Function to generate IndexRequest from incoming elements
* @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.
*/
@Deprecated
public ElasticsearchSink(
Map<String, String> userConfig,
List<TransportAddress> transportAddresses,
IndexRequestBuilder<T> indexRequestBuilder
);
/**
* Creates a new ElasticsearchSink that connects using an embedded Node.
* The sink will block and wait for a cluster to come online.
* @param userConfig The map of user settings for constructing the embedded Node and BulkProcessor.
* Important settings include "cluster.name" and bulk processing configuration.
* @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements
*/
public ElasticsearchSink(
Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction
);
/**
* Creates a new ElasticsearchSink that connects using a TransportClient.
* The sink will fail if no cluster can be connected to.
* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor.
* Important settings include "cluster.name" and bulk processing configuration.
* @param transportAddresses The addresses of Elasticsearch nodes to connect to using a TransportClient
* @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements
*/
public ElasticsearchSink(
Map<String, String> userConfig,
List<TransportAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction
);
/**
* Creates a new ElasticsearchSink with custom failure handling using an embedded Node.
* @param userConfig The map of user settings for constructing the Node and BulkProcessor
* @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements
* @param failureHandler Handler for failed ActionRequests
*/
public ElasticsearchSink(
Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler
);
/**
* Creates a new ElasticsearchSink with custom failure handling using a TransportClient.
* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor
* @param transportAddresses The addresses of Elasticsearch nodes to connect to
* @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements
* @param failureHandler Handler for failed ActionRequests
*/
public ElasticsearchSink(
Map<String, String> userConfig,
List<TransportAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler
);
}Interface for processing stream elements into Elasticsearch action requests.
/**
* Creates multiple ActionRequests from an element in a stream.
* @param <T> The type of the element handled by this ElasticsearchSinkFunction
*/
@PublicEvolving
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
/**
* Process the incoming element to produce multiple ActionRequests.
* The produced requests should be added to the provided RequestIndexer.
* @param element incoming element to process
* @param ctx runtime context containing information about the sink instance
* @param indexer request indexer that ActionRequest should be added to
*/
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
}Interface for adding requests to be sent to Elasticsearch.
/**
* Users add multiple delete, index or update requests to a RequestIndexer to prepare
* them for sending to an Elasticsearch cluster.
*/
@PublicEvolving
public interface RequestIndexer {
/**
* Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param deleteRequests The multiple DeleteRequest to add.
*/
void add(DeleteRequest... deleteRequests);
/**
* Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param indexRequests The multiple IndexRequest to add.
*/
void add(IndexRequest... indexRequests);
/**
* Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param updateRequests The multiple UpdateRequest to add.
*/
void add(UpdateRequest... updateRequests);
/**
* Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param actionRequests The multiple ActionRequest to add.
* @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods
*/
@Deprecated
default void add(ActionRequest... actionRequests) {
for (ActionRequest actionRequest : actionRequests) {
if (actionRequest instanceof IndexRequest) {
add((IndexRequest) actionRequest);
} else if (actionRequest instanceof DeleteRequest) {
add((DeleteRequest) actionRequest);
} else if (actionRequest instanceof UpdateRequest) {
add((UpdateRequest) actionRequest);
} else {
throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");
}
}
}
}Interface defining how failed ActionRequests should be handled.
/**
* An implementation of ActionRequestFailureHandler is provided by the user to define how failed
* ActionRequests should be handled, e.g. dropping them, reprocessing malformed documents, or
* simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
*/
@PublicEvolving
public interface ActionRequestFailureHandler extends Serializable {
/**
* Handle a failed ActionRequest.
* @param action the ActionRequest that failed due to the failure
* @param failure the cause of failure
* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
* @param indexer request indexer to re-add the failed action, if intended to do so
* @throws Throwable if the sink should fail on this failure, the implementation should rethrow
* the exception or a custom one
*/
void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable;
}Default failure handler that simply rethrows failures, causing the sink to fail.
/**
* An ActionRequestFailureHandler that simply fails the sink on any failures.
*/
@Internal
public class NoOpFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable {
// simply fail the sink
throw failure;
}
}Failure handler that ignores all failures and drops affected requests.
/**
* Ignores all kinds of failures and drops the affected ActionRequest.
*/
@Internal
public class IgnoringFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) {
// ignore failure
}
}Failure handler that retries requests failed due to temporary EsRejectedExecutionException.
/**
* An ActionRequestFailureHandler that re-adds requests that failed due to temporary
* EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),
* and fails for all other failures.
*/
@PublicEvolving
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else {
// rethrow all other failures
throw failure;
}
}
}The following constants define configuration keys used by the connector:
// From ElasticsearchSinkBase class
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";bulk.flush.max.actions: Maximum amount of elements to buffer (no default - uses Elasticsearch BulkProcessor defaults)bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer (no default - uses Elasticsearch BulkProcessor defaults)bulk.flush.interval.ms: Interval at which to flush data regardless of other settings (no default - uses Elasticsearch BulkProcessor defaults)bulk.flush.backoff.enable: Enable backoff retries (default: true)bulk.flush.backoff.type: Backoff type - "CONSTANT" or "EXPONENTIAL" (default: EXPONENTIAL)bulk.flush.backoff.retries: Maximum retry count (default: 8)bulk.flush.backoff.delay: Delay in milliseconds (default: 50)Important Note: Elasticsearch 1.x does not support backoff retries. The BulkProcessor in Elasticsearch 1.x does not provide backoff retry functionality, so these configuration options are parsed but have no effect and will log a warning when specified.
cluster.name: Name of the Elasticsearch cluster to connect tohttp.enabled: HTTP access setting (automatically set to false for embedded nodes)Creates IndexRequest objects from stream elements. This interface is deprecated in favor of ElasticsearchSinkFunction.
/**
* Function that creates an IndexRequest from an element in a Stream.
* This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.
* @param <T> The type of the element handled by this IndexRequestBuilder
* @deprecated Deprecated since version 1.2, to be removed at version 2.0.
* Please create a ElasticsearchSink using a ElasticsearchSinkFunction instead.
*/
@Deprecated
public interface IndexRequestBuilder<T> extends Function, Serializable {
/**
* Creates an IndexRequest from an element.
* @param element The element that needs to be turned in to an IndexRequest
* @param ctx The Flink RuntimeContext of the ElasticsearchSink
* @return The constructed IndexRequest
*/
IndexRequest createIndexRequest(T element, RuntimeContext ctx);
}Backoff strategy for bulk flush retries.
/**
* Backoff strategy types for bulk flush retries.
*/
public enum FlushBackoffType {
/** Constant delay between retries */
CONSTANT,
/** Exponentially increasing delay between retries */
EXPONENTIAL
}Configuration for bulk flush backoff behavior.
/**
* Configuration for bulk flush backoff behavior.
*/
public static class BulkFlushBackoffPolicy implements Serializable {
/** The backoff type (CONSTANT or EXPONENTIAL) */
private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
/** Maximum number of retry attempts */
private int maxRetryCount = 8;
/** Delay in milliseconds between retries */
private long delayMillis = 50L;
// Constructor and getter/setter methods...
}// Exception thrown when Elasticsearch queues are full
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
// Flink utility for exception handling
import org.apache.flink.util.ExceptionUtils;// Elasticsearch client types from elasticsearch 1.x dependency (version 1.7.1)
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
// Flink runtime types
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;
// Java standard types
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Arrays;Uses a local Elasticsearch Node for communication. The sink will block and wait for a cluster to come online.
// Configuration for embedded node
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster");
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);Uses a TransportClient with specified addresses. The sink will fail if no cluster connection can be established.
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
// Configuration for transport client
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster");
List<TransportAddress> transportAddresses = Arrays.asList(
new InetSocketTransportAddress("localhost", 9300),
new InetSocketTransportAddress("localhost", 9301)
);
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, transportAddresses, sinkFunction);The connector provides flexible error handling through the ActionRequestFailureHandler interface:
// Custom failure handler example
ActionRequestFailureHandler customHandler = new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (restStatusCode == 429) {
// Retry on rate limiting
indexer.add(action);
} else if (restStatusCode >= 400 && restStatusCode < 500) {
// Drop malformed requests (client errors)
System.err.println("Dropping malformed request: " + failure.getMessage());
} else {
// Fail on other errors
throw failure;
}
}
};
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction, customHandler);ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
Map<String, Object> json = new HashMap<>();
json.put("message", element);
json.put("timestamp", System.currentTimeMillis());
IndexRequest request = Requests.indexRequest()
.index("logs")
.type("message")
.source(json);
indexer.add(request);
}
};public class User {
public String name;
public int age;
public String email;
// constructors, getters, setters...
}
ElasticsearchSinkFunction<User> userSinkFunction = new ElasticsearchSinkFunction<User>() {
@Override
public void process(User user, RuntimeContext ctx, RequestIndexer indexer) {
Map<String, Object> json = new HashMap<>();
json.put("name", user.name);
json.put("age", user.age);
json.put("email", user.email);
json.put("indexed_at", System.currentTimeMillis());
// Index request
IndexRequest indexRequest = Requests.indexRequest()
.index("users")
.type("user")
.id(user.email) // Use email as document ID
.source(json);
indexer.add(indexRequest);
// Optional: Also create update request for upsert behavior
UpdateRequest updateRequest = Requests.updateRequest()
.index("users")
.type("user")
.id(user.email)
.doc(json)
.upsert(json);
indexer.add(updateRequest);
}
};// Recommended approach using configuration constants
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "production-cluster");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "10");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "30000");
// Note: Backoff configurations will be ignored in Elasticsearch 1.x but can be set
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
ElasticsearchSink<MyEvent> sink = new ElasticsearchSink<>(
config,
mySinkFunction,
new RetryRejectedExecutionFailureHandler()
);ElasticsearchSinkFunction<Event> eventSinkFunction = new ElasticsearchSinkFunction<Event>() {
@Override
public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {
// Index to main events index
Map<String, Object> eventJson = new HashMap<>();
eventJson.put("type", event.getType());
eventJson.put("data", event.getData());
eventJson.put("timestamp", event.getTimestamp());
IndexRequest eventRequest = Requests.indexRequest()
.index("events")
.type("event")
.source(eventJson);
indexer.add(eventRequest);
// Also index to daily partitioned index
String dailyIndex = "events-" + event.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
IndexRequest dailyRequest = Requests.indexRequest()
.index(dailyIndex)
.type("event")
.source(eventJson);
indexer.add(dailyRequest);
}
};