or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-connector-elasticsearch-2-12

Apache Flink connector for Elasticsearch 1.x that enables streaming data ingestion into Elasticsearch clusters

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-elasticsearch_2.12@1.8.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch-2-12@1.8.0

index.mddocs/

Flink Elasticsearch Connector

The Apache Flink Elasticsearch connector enables streaming data ingestion from Flink data streams into Elasticsearch 1.x clusters only. It provides fault-tolerant, exactly-once processing guarantees with configurable bulk processing and flexible failure handling strategies.

Important: This connector is specifically designed for Elasticsearch 1.x and is not compatible with newer versions of Elasticsearch.

Package Information

  • Package Name: flink-connector-elasticsearch_2.12
  • Package Type: maven
  • Language: Java
  • Elasticsearch Version: 1.x (specifically 1.7.1)
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch_2.12</artifactId>
      <version>1.8.3</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;

Basic Usage

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

// Configure Elasticsearch connection using configuration constants
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "5000");

// Create sink function
ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);
        json.put("timestamp", System.currentTimeMillis());
        
        IndexRequest request = Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
            
        indexer.add(request);
    }
};

// Create and add sink to stream
ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);
dataStream.addSink(sink);

Architecture

The connector is built around several key components:

  • ElasticsearchSink: Main sink implementation that connects to Elasticsearch clusters
  • Connection Modes: Supports both embedded Node and TransportClient connection modes
  • Bulk Processing: Uses Elasticsearch's BulkProcessor for efficient batch operations
  • Failure Handling: Configurable strategies for handling failed requests
  • Type Safety: Generic type support for processing different data types
  • Version-Specific Bridge: Uses Elasticsearch1ApiCallBridge for 1.x compatibility

Capabilities

Elasticsearch Sink

Main sink implementation for connecting Flink streams to Elasticsearch 1.x clusters.

/**
 * Elasticsearch 1.x sink that requests multiple ActionRequests against a cluster for each incoming element.
 * @param <T> Type of the elements handled by this sink
 */
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
    
    /**
     * Creates a new ElasticsearchSink that connects using an embedded Node (deprecated constructor).
     * @param userConfig The map of user settings for constructing the Node and BulkProcessor
     * @param indexRequestBuilder Function to generate IndexRequest from incoming elements
     * @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.
     */
    @Deprecated
    public ElasticsearchSink(
        Map<String, String> userConfig,
        IndexRequestBuilder<T> indexRequestBuilder
    );
    
    /**
     * Creates a new ElasticsearchSink that connects using a TransportClient (deprecated constructor).
     * @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor
     * @param transportAddresses The addresses of Elasticsearch nodes to connect to
     * @param indexRequestBuilder Function to generate IndexRequest from incoming elements
     * @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.
     */
    @Deprecated
    public ElasticsearchSink(
        Map<String, String> userConfig,
        List<TransportAddress> transportAddresses,
        IndexRequestBuilder<T> indexRequestBuilder
    );
    
    /**
     * Creates a new ElasticsearchSink that connects using an embedded Node.
     * The sink will block and wait for a cluster to come online.
     * @param userConfig The map of user settings for constructing the embedded Node and BulkProcessor.
     *                   Important settings include "cluster.name" and bulk processing configuration.
     * @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements
     */
    public ElasticsearchSink(
        Map<String, String> userConfig, 
        ElasticsearchSinkFunction<T> elasticsearchSinkFunction
    );
    
    /**
     * Creates a new ElasticsearchSink that connects using a TransportClient.
     * The sink will fail if no cluster can be connected to.
     * @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor.
     *                   Important settings include "cluster.name" and bulk processing configuration.
     * @param transportAddresses The addresses of Elasticsearch nodes to connect to using a TransportClient
     * @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements
     */
    public ElasticsearchSink(
        Map<String, String> userConfig,
        List<TransportAddress> transportAddresses,
        ElasticsearchSinkFunction<T> elasticsearchSinkFunction
    );
    
    /**
     * Creates a new ElasticsearchSink with custom failure handling using an embedded Node.
     * @param userConfig The map of user settings for constructing the Node and BulkProcessor
     * @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements
     * @param failureHandler Handler for failed ActionRequests
     */
    public ElasticsearchSink(
        Map<String, String> userConfig,
        ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
        ActionRequestFailureHandler failureHandler
    );
    
    /**
     * Creates a new ElasticsearchSink with custom failure handling using a TransportClient.
     * @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor
     * @param transportAddresses The addresses of Elasticsearch nodes to connect to
     * @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements
     * @param failureHandler Handler for failed ActionRequests
     */
    public ElasticsearchSink(
        Map<String, String> userConfig,
        List<TransportAddress> transportAddresses,
        ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
        ActionRequestFailureHandler failureHandler
    );
}

Elasticsearch Sink Function

Interface for processing stream elements into Elasticsearch action requests.

/**
 * Creates multiple ActionRequests from an element in a stream.
 * @param <T> The type of the element handled by this ElasticsearchSinkFunction
 */
@PublicEvolving
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
    
    /**
     * Process the incoming element to produce multiple ActionRequests.
     * The produced requests should be added to the provided RequestIndexer.
     * @param element incoming element to process
     * @param ctx runtime context containing information about the sink instance
     * @param indexer request indexer that ActionRequest should be added to
     */
    void process(T element, RuntimeContext ctx, RequestIndexer indexer);
}

Request Indexer

Interface for adding requests to be sent to Elasticsearch.

/**
 * Users add multiple delete, index or update requests to a RequestIndexer to prepare
 * them for sending to an Elasticsearch cluster.
 */
@PublicEvolving
public interface RequestIndexer {
    
    /**
     * Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.
     * @param deleteRequests The multiple DeleteRequest to add.
     */
    void add(DeleteRequest... deleteRequests);
    
    /**
     * Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.
     * @param indexRequests The multiple IndexRequest to add.
     */
    void add(IndexRequest... indexRequests);
    
    /**
     * Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.
     * @param updateRequests The multiple UpdateRequest to add.
     */
    void add(UpdateRequest... updateRequests);
    
    /**
     * Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.
     * @param actionRequests The multiple ActionRequest to add.
     * @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods
     */
    @Deprecated
    default void add(ActionRequest... actionRequests) {
        for (ActionRequest actionRequest : actionRequests) {
            if (actionRequest instanceof IndexRequest) {
                add((IndexRequest) actionRequest);
            } else if (actionRequest instanceof DeleteRequest) {
                add((DeleteRequest) actionRequest);
            } else if (actionRequest instanceof UpdateRequest) {
                add((UpdateRequest) actionRequest);
            } else {
                throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");
            }
        }
    }
}

Action Request Failure Handler

Interface defining how failed ActionRequests should be handled.

/**
 * An implementation of ActionRequestFailureHandler is provided by the user to define how failed
 * ActionRequests should be handled, e.g. dropping them, reprocessing malformed documents, or
 * simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
 */
@PublicEvolving
public interface ActionRequestFailureHandler extends Serializable {
    
    /**
     * Handle a failed ActionRequest.
     * @param action the ActionRequest that failed due to the failure
     * @param failure the cause of failure
     * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
     * @param indexer request indexer to re-add the failed action, if intended to do so
     * @throws Throwable if the sink should fail on this failure, the implementation should rethrow
     *                   the exception or a custom one
     */
    void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable;
}

Failure Handler Implementations

NoOpFailureHandler

Default failure handler that simply rethrows failures, causing the sink to fail.

/**
 * An ActionRequestFailureHandler that simply fails the sink on any failures.
 */
@Internal
public class NoOpFailureHandler implements ActionRequestFailureHandler {
    
    @Override
    public void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable {
        // simply fail the sink
        throw failure;
    }
}

IgnoringFailureHandler

Failure handler that ignores all failures and drops affected requests.

/**
 * Ignores all kinds of failures and drops the affected ActionRequest.
 */
@Internal
public class IgnoringFailureHandler implements ActionRequestFailureHandler {
    
    @Override
    public void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) {
        // ignore failure
    }
}

RetryRejectedExecutionFailureHandler

Failure handler that retries requests failed due to temporary EsRejectedExecutionException.

/**
 * An ActionRequestFailureHandler that re-adds requests that failed due to temporary
 * EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),
 * and fails for all other failures.
 */
@PublicEvolving
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
    
    @Override
    public void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable {
        if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
            indexer.add(action);
        } else {
            // rethrow all other failures
            throw failure;
        }
    }
}

Configuration Constants

Configuration Keys

The following constants define configuration keys used by the connector:

// From ElasticsearchSinkBase class
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";

Configuration Options

Bulk Processing Configuration

  • bulk.flush.max.actions: Maximum amount of elements to buffer (no default - uses Elasticsearch BulkProcessor defaults)
  • bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer (no default - uses Elasticsearch BulkProcessor defaults)
  • bulk.flush.interval.ms: Interval at which to flush data regardless of other settings (no default - uses Elasticsearch BulkProcessor defaults)

Backoff Configuration

  • bulk.flush.backoff.enable: Enable backoff retries (default: true)
  • bulk.flush.backoff.type: Backoff type - "CONSTANT" or "EXPONENTIAL" (default: EXPONENTIAL)
  • bulk.flush.backoff.retries: Maximum retry count (default: 8)
  • bulk.flush.backoff.delay: Delay in milliseconds (default: 50)

Important Note: Elasticsearch 1.x does not support backoff retries. The BulkProcessor in Elasticsearch 1.x does not provide backoff retry functionality, so these configuration options are parsed but have no effect and will log a warning when specified.

Cluster Configuration

  • cluster.name: Name of the Elasticsearch cluster to connect to
  • http.enabled: HTTP access setting (automatically set to false for embedded nodes)

Deprecated Interfaces

IndexRequestBuilder

Creates IndexRequest objects from stream elements. This interface is deprecated in favor of ElasticsearchSinkFunction.

/**
 * Function that creates an IndexRequest from an element in a Stream.
 * This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.
 * @param <T> The type of the element handled by this IndexRequestBuilder
 * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
 *             Please create a ElasticsearchSink using a ElasticsearchSinkFunction instead.
 */
@Deprecated
public interface IndexRequestBuilder<T> extends Function, Serializable {
    
    /**
     * Creates an IndexRequest from an element.
     * @param element The element that needs to be turned in to an IndexRequest
     * @param ctx The Flink RuntimeContext of the ElasticsearchSink
     * @return The constructed IndexRequest
     */
    IndexRequest createIndexRequest(T element, RuntimeContext ctx);
}

Enums and Types

FlushBackoffType

Backoff strategy for bulk flush retries.

/**
 * Backoff strategy types for bulk flush retries.
 */
public enum FlushBackoffType {
    /** Constant delay between retries */
    CONSTANT,
    /** Exponentially increasing delay between retries */
    EXPONENTIAL
}

BulkFlushBackoffPolicy

Configuration for bulk flush backoff behavior.

/**
 * Configuration for bulk flush backoff behavior.
 */
public static class BulkFlushBackoffPolicy implements Serializable {
    /** The backoff type (CONSTANT or EXPONENTIAL) */
    private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
    
    /** Maximum number of retry attempts */
    private int maxRetryCount = 8;
    
    /** Delay in milliseconds between retries */
    private long delayMillis = 50L;
    
    // Constructor and getter/setter methods...
}

Exception Types

// Exception thrown when Elasticsearch queues are full
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

// Flink utility for exception handling
import org.apache.flink.util.ExceptionUtils;

Core Types

// Elasticsearch client types from elasticsearch 1.x dependency (version 1.7.1)
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;

// Flink runtime types
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;

// Java standard types
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Arrays;

Connection Modes

Embedded Node Mode

Uses a local Elasticsearch Node for communication. The sink will block and wait for a cluster to come online.

// Configuration for embedded node
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster");

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);

TransportClient Mode

Uses a TransportClient with specified addresses. The sink will fail if no cluster connection can be established.

import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

// Configuration for transport client
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster");

List<TransportAddress> transportAddresses = Arrays.asList(
    new InetSocketTransportAddress("localhost", 9300),
    new InetSocketTransportAddress("localhost", 9301)
);

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, transportAddresses, sinkFunction);

Error Handling

The connector provides flexible error handling through the ActionRequestFailureHandler interface:

// Custom failure handler example
ActionRequestFailureHandler customHandler = new ActionRequestFailureHandler() {
    @Override
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
        if (restStatusCode == 429) {
            // Retry on rate limiting
            indexer.add(action);
        } else if (restStatusCode >= 400 && restStatusCode < 500) {
            // Drop malformed requests (client errors)
            System.err.println("Dropping malformed request: " + failure.getMessage());
        } else {
            // Fail on other errors
            throw failure;
        }
    }
};

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction, customHandler);

Usage Examples

Basic String Indexing

ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        Map<String, Object> json = new HashMap<>();
        json.put("message", element);
        json.put("timestamp", System.currentTimeMillis());
        
        IndexRequest request = Requests.indexRequest()
            .index("logs")
            .type("message")
            .source(json);
            
        indexer.add(request);
    }
};

Complex Object Processing

public class User {
    public String name;
    public int age;
    public String email;
    // constructors, getters, setters...
}

ElasticsearchSinkFunction<User> userSinkFunction = new ElasticsearchSinkFunction<User>() {
    @Override
    public void process(User user, RuntimeContext ctx, RequestIndexer indexer) {
        Map<String, Object> json = new HashMap<>();
        json.put("name", user.name);
        json.put("age", user.age);
        json.put("email", user.email);
        json.put("indexed_at", System.currentTimeMillis());
        
        // Index request
        IndexRequest indexRequest = Requests.indexRequest()
            .index("users")
            .type("user")
            .id(user.email) // Use email as document ID
            .source(json);
            
        indexer.add(indexRequest);
        
        // Optional: Also create update request for upsert behavior
        UpdateRequest updateRequest = Requests.updateRequest()
            .index("users")
            .type("user")
            .id(user.email)
            .doc(json)
            .upsert(json);
            
        indexer.add(updateRequest);
    }
};

Using Configuration Constants

// Recommended approach using configuration constants
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "production-cluster");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "10");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "30000");

// Note: Backoff configurations will be ignored in Elasticsearch 1.x but can be set
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");
config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");

ElasticsearchSink<MyEvent> sink = new ElasticsearchSink<>(
    config, 
    mySinkFunction,
    new RetryRejectedExecutionFailureHandler()
);

Multiple Index Operations

ElasticsearchSinkFunction<Event> eventSinkFunction = new ElasticsearchSinkFunction<Event>() {
    @Override
    public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {
        // Index to main events index
        Map<String, Object> eventJson = new HashMap<>();
        eventJson.put("type", event.getType());
        eventJson.put("data", event.getData());
        eventJson.put("timestamp", event.getTimestamp());
        
        IndexRequest eventRequest = Requests.indexRequest()
            .index("events")
            .type("event")
            .source(eventJson);
        indexer.add(eventRequest);
        
        // Also index to daily partitioned index
        String dailyIndex = "events-" + event.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        IndexRequest dailyRequest = Requests.indexRequest()
            .index(dailyIndex)
            .type("event")
            .source(eventJson);
        indexer.add(dailyRequest);
    }
};