Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch6_2-12@1.14.0Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality for real-time data ingestion into Elasticsearch clusters. It supports both DataStream API for programmatic streaming jobs and Table API for SQL-based stream processing, with comprehensive configuration options for connection management, bulk processing behavior, and retry mechanisms.
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.HttpHost;For Table API usage:
// Configuration via DDL
CREATE TABLE elasticsearch_sink (
id BIGINT,
name STRING,
age INT
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'users',
'document-type' = '_doc'
);import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// Create HTTP hosts list
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
// Define sink function
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("_doc")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
};
// Create sink
ElasticsearchSink<String> sink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
).build();
// Add sink to DataStream
DataStream<String> input = // ... your data stream
input.addSink(sink);The Flink Elasticsearch 6 connector is built around several key components:
Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
// Private constructor - use Builder
}
public static class ElasticsearchSink.Builder<T> {
public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
public ElasticsearchSink<T> build();
}SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.
// Table API usage via DDL
CREATE TABLE sink_table (...) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'target-index',
'document-type' = '_doc'
);Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.
// Builder methods for bulk configuration
public void setBulkFlushMaxActions(int numMaxActions);
public void setBulkFlushMaxSizeMb(int maxSizeMb);
public void setBulkFlushInterval(long intervalMillis);Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.
public interface ActionRequestFailureHandler extends Serializable {
void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable;
}REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.
public interface RestClientFactory extends Serializable {
void configureRestClientBuilder(RestClientBuilder restClientBuilder);
}// Core functional interface for processing stream elements
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
default void open() throws Exception {}
default void close() throws Exception {}
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
}
// Request indexer for adding ActionRequests
public interface RequestIndexer {
void add(DeleteRequest... deleteRequests);
void add(IndexRequest... indexRequests);
void add(UpdateRequest... updateRequests);
}
// Backoff configuration
public enum FlushBackoffType {
CONSTANT,
EXPONENTIAL
}
// Backoff configuration policy for bulk processing
public static class BulkFlushBackoffPolicy implements Serializable {
/**
* Get the backoff type (CONSTANT or EXPONENTIAL).
* @return the backoff type
*/
public FlushBackoffType getBackoffType();
/**
* Get the maximum number of retry attempts.
* @return the maximum retry count
*/
public int getMaxRetryCount();
/**
* Get the initial delay in milliseconds.
* @return the delay in milliseconds
*/
public long getDelayMillis();
/**
* Set the backoff type.
* @param backoffType the backoff type to use
*/
public void setBackoffType(FlushBackoffType backoffType);
/**
* Set the maximum number of retry attempts.
* @param maxRetryCount the maximum retry count (must be >= 0)
*/
public void setMaxRetryCount(int maxRetryCount);
/**
* Set the initial delay between retry attempts.
* @param delayMillis the delay in milliseconds (must be >= 0)
*/
public void setDelayMillis(long delayMillis);
}