or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md
tile.json

failure-handling.mddocs/

Failure Handling

Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.

Capabilities

ActionRequestFailureHandler Interface

Main interface for implementing custom failure handling strategies.

/**
 * An implementation of ActionRequestFailureHandler is provided by the user to define how
 * failed ActionRequests should be handled, e.g. dropping them, reprocessing
 * malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure
 * is only temporary.
 */
@PublicEvolving
public interface ActionRequestFailureHandler extends Serializable {
    /**
     * Handle a failed ActionRequest.
     * @param action the ActionRequest that failed due to the failure
     * @param failure the cause of failure
     * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
     * @param indexer request indexer to re-add the failed action, if intended to do so
     * @throws Throwable if the sink should fail on this failure, the implementation should rethrow
     *     the exception or a custom one
     */
    void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable;
}

Usage Examples:

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.ExceptionUtils;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

// Custom failure handler with different strategies
public class SmartFailureHandler implements ActionRequestFailureHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SmartFailureHandler.class);
    
    @Override
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) 
            throws Throwable {
        
        LOG.error("Failed Elasticsearch request: {} (status: {})", failure.getMessage(), restStatusCode, failure);
        
        // Handle queue saturation - retry the request
        if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
            LOG.warn("Elasticsearch queue full, retrying request");
            indexer.add(action);
            return;
        }
        
        // Handle malformed documents - log and drop
        if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
            LOG.error("Malformed document, dropping request: {}", action);
            return; // Drop the request without failing the sink
        }
        
        // Handle timeout or connection issues - limited retries
        if (restStatusCode == 408 || restStatusCode == -1) {
            // Could implement retry counter logic here
            LOG.warn("Connection issue, retrying request once");
            indexer.add(action);
            return;
        }
        
        // Handle client errors (4xx) - log and drop
        if (restStatusCode >= 400 && restStatusCode < 500) {
            LOG.error("Client error ({}), dropping request: {}", restStatusCode, action);
            return;
        }
        
        // Handle server errors (5xx) - fail fast
        if (restStatusCode >= 500) {
            LOG.error("Server error ({}), failing sink", restStatusCode);
            throw failure;
        }
        
        // For all other failures, fail the sink
        throw failure;
    }
}

// Simple retry-all handler
public class RetryAllFailureHandler implements ActionRequestFailureHandler {
    private final int maxRetries;
    private final Map<ActionRequest, Integer> retryCount = new HashMap<>();
    
    public RetryAllFailureHandler(int maxRetries) {
        this.maxRetries = maxRetries;
    }
    
    @Override
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) 
            throws Throwable {
        int currentRetries = retryCount.getOrDefault(action, 0);
        
        if (currentRetries < maxRetries) {
            retryCount.put(action, currentRetries + 1);
            LOG.warn("Retrying failed request, attempt {} of {}", currentRetries + 1, maxRetries);
            indexer.add(action);
        } else {
            LOG.error("Max retries exceeded for request, failing sink");
            retryCount.remove(action);
            throw failure;
        }
    }
}

// Using custom failure handler
ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setFailureHandler(new SmartFailureHandler())
.build();

Built-in Failure Handlers

Pre-implemented failure handling strategies for common scenarios.

RetryRejectedExecutionFailureHandler

/**
 * An ActionRequestFailureHandler that re-adds requests that failed due to temporary 
 * EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),
 * and fails for all other failures.
 */
@PublicEvolving
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
    @Override
    public void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable;
}

Usage Examples:

import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;

// Sink that retries on queue full, fails on other errors
ElasticsearchSink<Event> resilientSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setFailureHandler(new RetryRejectedExecutionFailureHandler())
.build();

// For Table API
CREATE TABLE resilient_table (...) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'events',
  'document-type' = '_doc',
  'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler'
);

NoOpFailureHandler

/**
 * A ActionRequestFailureHandler that simply fails the sink on any failures.
 * This is the default failure handler.
 */
@Internal  
public class NoOpFailureHandler implements ActionRequestFailureHandler {
    @Override
    public void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable;
}

Usage Examples:

import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;

// Explicit use of default handler (fails immediately on any error)
ElasticsearchSink<Event> strictSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setFailureHandler(new NoOpFailureHandler())  // This is the default
.build();

// Default behavior - no explicit handler needed
ElasticsearchSink<Event> defaultSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
).build(); // Uses NoOpFailureHandler by default

IgnoringFailureHandler

/**
 * A ActionRequestFailureHandler that ignores all failures and continues processing.
 * Warning: This can lead to data loss as failed requests are dropped.
 */
@Internal
public class IgnoringFailureHandler implements ActionRequestFailureHandler {
    @Override
    public void onFailure(
        ActionRequest action, 
        Throwable failure, 
        int restStatusCode, 
        RequestIndexer indexer
    ) throws Throwable;
}

Usage Examples:

import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;

// Sink that drops all failed requests (use with caution!)
ElasticsearchSink<Event> lenientSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction  
)
.setFailureHandler(new IgnoringFailureHandler())
.build();

// For Table API - useful for non-critical data
CREATE TABLE lenient_table (...) WITH (
  'connector' = 'elasticsearch-6', 
  'hosts' = 'http://localhost:9200',
  'index' = 'optional_events',
  'document-type' = '_doc',
  'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'
);

Advanced Failure Handling Patterns

Conditional Retry Handler

public class ConditionalRetryHandler implements ActionRequestFailureHandler {
    private final Set<String> retryableIndices;
    private final ActionRequestFailureHandler fallbackHandler;
    
    public ConditionalRetryHandler(Set<String> retryableIndices, ActionRequestFailureHandler fallbackHandler) {
        this.retryableIndices = retryableIndices;
        this.fallbackHandler = fallbackHandler;
    }
    
    @Override
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) 
            throws Throwable {
        
        String targetIndex = extractIndexFromRequest(action);
        
        // Only retry for specific indices
        if (retryableIndices.contains(targetIndex) && 
            ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
            LOG.info("Retrying rejected request for index: {}", targetIndex);
            indexer.add(action);
        } else {
            // Delegate to fallback handler
            fallbackHandler.onFailure(action, failure, restStatusCode, indexer);
        }
    }
    
    private String extractIndexFromRequest(ActionRequest request) {
        if (request instanceof IndexRequest) {
            return ((IndexRequest) request).index();
        } else if (request instanceof UpdateRequest) {
            return ((UpdateRequest) request).index();
        } else if (request instanceof DeleteRequest) {
            return ((DeleteRequest) request).index();
        }
        return "unknown";
    }
}

// Usage
Set<String> criticalIndices = Set.of("critical-events", "important-logs");
ElasticsearchSink<Event> conditionalSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setFailureHandler(new ConditionalRetryHandler(
    criticalIndices, 
    new NoOpFailureHandler()  // Fail fast for non-critical indices
))
.build();

Dead Letter Queue Handler

public class DeadLetterQueueHandler implements ActionRequestFailureHandler {
    private final String deadLetterIndex;
    private final ActionRequestFailureHandler baseHandler;
    
    public DeadLetterQueueHandler(String deadLetterIndex, ActionRequestFailureHandler baseHandler) {
        this.deadLetterIndex = deadLetterIndex;
        this.baseHandler = baseHandler;
    }
    
    @Override  
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
            throws Throwable {
        
        try {
            // Try the base handler first
            baseHandler.onFailure(action, failure, restStatusCode, indexer);
        } catch (Throwable t) {
            // If base handler fails, send to dead letter queue
            LOG.warn("Sending failed request to dead letter queue: {}", deadLetterIndex);
            
            Map<String, Object> deadLetterDoc = new HashMap<>();
            deadLetterDoc.put("original_index", extractIndexFromRequest(action));
            deadLetterDoc.put("failure_reason", failure.getMessage());
            deadLetterDoc.put("failure_time", System.currentTimeMillis());
            deadLetterDoc.put("rest_status_code", restStatusCode);
            deadLetterDoc.put("original_request", action.toString());
            
            IndexRequest deadLetterRequest = Requests.indexRequest()
                    .index(deadLetterIndex)
                    .type("_doc")
                    .source(deadLetterDoc);
                    
            indexer.add(deadLetterRequest);
        }
    }
}

// Usage
ElasticsearchSink<Event> deadLetterSink = new ElasticsearchSink.Builder<>(
    httpHosts,
    sinkFunction
)
.setFailureHandler(new DeadLetterQueueHandler(
    "failed-requests",
    new RetryRejectedExecutionFailureHandler()
))
.build();

Error Categories and Handling Strategies

Network and Connection Errors

// Common network-related failures
if (restStatusCode == -1 || restStatusCode == 408 || restStatusCode == 503) {
    // Connection timeout, service unavailable
    // Strategy: Retry with backoff
    indexer.add(action);
}

Client Errors (4xx)

// Client errors - usually permanent
if (restStatusCode >= 400 && restStatusCode < 500) {
    switch (restStatusCode) {
        case 400: // Bad Request - malformed document
        case 404: // Not Found - index doesn't exist
        case 409: // Conflict - version conflict
            LOG.error("Client error ({}), dropping request", restStatusCode);
            return; // Drop request
        default:
            throw failure; // Fail sink
    }
}

Server Errors (5xx)

// Server errors - may be temporary
if (restStatusCode >= 500) {
    switch (restStatusCode) {
        case 502: // Bad Gateway  
        case 503: // Service Unavailable
        case 504: // Gateway Timeout
            // Temporary server issues - retry
            indexer.add(action);
            break;
        default:
            // Serious server errors - fail fast
            throw failure;
    }
}

Elasticsearch-Specific Errors

// Queue saturation
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
    indexer.add(action); // Retry
}

// Parse errors
if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
    return; // Drop malformed documents
}

// Security errors
if (ExceptionUtils.findThrowable(failure, ElasticsearchSecurityException.class).isPresent()) {
    throw failure; // Fail - likely a configuration issue
}