or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-elasticsearch6_2-12

Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-elasticsearch6_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch6_2-12@1.14.0

index.mddocs/

Apache Flink Elasticsearch 6 Connector

Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality for real-time data ingestion into Elasticsearch clusters. It supports both DataStream API for programmatic streaming jobs and Table API for SQL-based stream processing, with comprehensive configuration options for connection management, bulk processing behavior, and retry mechanisms.

Package Information

  • Package Name: flink-connector-elasticsearch6_2.12
  • Package Type: maven
  • Language: Java (with Scala 2.12 support)
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-elasticsearch6_2.12
  • Installation: Add to Maven dependencies with version 1.14.6

Core Imports

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.HttpHost;

For Table API usage:

// Configuration via DDL
CREATE TABLE elasticsearch_sink (
  id BIGINT,
  name STRING,
  age INT
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'users',
  'document-type' = '_doc'
);

Basic Usage

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Create HTTP hosts list
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

// Define sink function
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);
        
        return Requests.indexRequest()
                .index("my-index")
                .type("_doc")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
};

// Create sink
ElasticsearchSink<String> sink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
).build();

// Add sink to DataStream
DataStream<String> input = // ... your data stream
input.addSink(sink);

Architecture

The Flink Elasticsearch 6 connector is built around several key components:

  • ElasticsearchSink: Main sink class that extends ElasticsearchSinkBase and uses RestHighLevelClient
  • Builder Pattern: Fluent API for configuring bulk processing, failure handling, and client settings
  • Bulk Processing: Internal BulkProcessor buffers multiple ActionRequests before sending to cluster
  • Failure Handling: Pluggable ActionRequestFailureHandler system for custom error recovery strategies
  • Table API Integration: Dynamic table sink factory for SQL-based stream processing
  • REST Client: Uses Elasticsearch REST High Level Client with configurable connection settings

Capabilities

DataStream API Integration

Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.

public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
    // Private constructor - use Builder
}

public static class ElasticsearchSink.Builder<T> {
    public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
    public ElasticsearchSink<T> build();
}

DataStream API

Table API Integration

SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.

// Table API usage via DDL
CREATE TABLE sink_table (...) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'target-index',
  'document-type' = '_doc'
);

Table API

Bulk Processing Configuration

Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.

// Builder methods for bulk configuration
public void setBulkFlushMaxActions(int numMaxActions);
public void setBulkFlushMaxSizeMb(int maxSizeMb);
public void setBulkFlushInterval(long intervalMillis);

Bulk Processing

Failure Handling

Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.

public interface ActionRequestFailureHandler extends Serializable {
    void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable;
}

Failure Handling

Client Configuration

REST client factory system for customizing Elasticsearch client configuration. Supports authentication, SSL, timeouts, and other client-level settings.

public interface RestClientFactory extends Serializable {
    void configureRestClientBuilder(RestClientBuilder restClientBuilder);
}

Client Configuration

Types

// Core functional interface for processing stream elements
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
    default void open() throws Exception {}
    default void close() throws Exception {}
    void process(T element, RuntimeContext ctx, RequestIndexer indexer);
}

// Request indexer for adding ActionRequests
public interface RequestIndexer {
    void add(DeleteRequest... deleteRequests);
    void add(IndexRequest... indexRequests); 
    void add(UpdateRequest... updateRequests);
}

// Backoff configuration
public enum FlushBackoffType {
    CONSTANT,
    EXPONENTIAL
}

// Backoff configuration policy for bulk processing
public static class BulkFlushBackoffPolicy implements Serializable {
    /**
     * Get the backoff type (CONSTANT or EXPONENTIAL).
     * @return the backoff type
     */
    public FlushBackoffType getBackoffType();
    
    /**
     * Get the maximum number of retry attempts.
     * @return the maximum retry count
     */
    public int getMaxRetryCount();
    
    /**
     * Get the initial delay in milliseconds.
     * @return the delay in milliseconds
     */
    public long getDelayMillis();
    
    /**
     * Set the backoff type.
     * @param backoffType the backoff type to use
     */
    public void setBackoffType(FlushBackoffType backoffType);
    
    /**
     * Set the maximum number of retry attempts.
     * @param maxRetryCount the maximum retry count (must be >= 0)
     */
    public void setMaxRetryCount(int maxRetryCount);
    
    /**
     * Set the initial delay between retry attempts.
     * @param delayMillis the delay in milliseconds (must be >= 0)
     */
    public void setDelayMillis(long delayMillis);
}