Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.
Main interface for implementing custom failure handling strategies.
/**
* An implementation of ActionRequestFailureHandler is provided by the user to define how
* failed ActionRequests should be handled, e.g. dropping them, reprocessing
* malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure
* is only temporary.
*/
@PublicEvolving
public interface ActionRequestFailureHandler extends Serializable {
/**
* Handle a failed ActionRequest.
* @param action the ActionRequest that failed due to the failure
* @param failure the cause of failure
* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
* @param indexer request indexer to re-add the failed action, if intended to do so
* @throws Throwable if the sink should fail on this failure, the implementation should rethrow
* the exception or a custom one
*/
void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable;
}Usage Examples:
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.ExceptionUtils;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
// Custom failure handler with different strategies
public class SmartFailureHandler implements ActionRequestFailureHandler {
private static final Logger LOG = LoggerFactory.getLogger(SmartFailureHandler.class);
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable {
LOG.error("Failed Elasticsearch request: {} (status: {})", failure.getMessage(), restStatusCode, failure);
// Handle queue saturation - retry the request
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
LOG.warn("Elasticsearch queue full, retrying request");
indexer.add(action);
return;
}
// Handle malformed documents - log and drop
if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
LOG.error("Malformed document, dropping request: {}", action);
return; // Drop the request without failing the sink
}
// Handle timeout or connection issues - limited retries
if (restStatusCode == 408 || restStatusCode == -1) {
// Could implement retry counter logic here
LOG.warn("Connection issue, retrying request once");
indexer.add(action);
return;
}
// Handle client errors (4xx) - log and drop
if (restStatusCode >= 400 && restStatusCode < 500) {
LOG.error("Client error ({}), dropping request: {}", restStatusCode, action);
return;
}
// Handle server errors (5xx) - fail fast
if (restStatusCode >= 500) {
LOG.error("Server error ({}), failing sink", restStatusCode);
throw failure;
}
// For all other failures, fail the sink
throw failure;
}
}
// Simple retry-all handler
public class RetryAllFailureHandler implements ActionRequestFailureHandler {
private final int maxRetries;
private final Map<ActionRequest, Integer> retryCount = new HashMap<>();
public RetryAllFailureHandler(int maxRetries) {
this.maxRetries = maxRetries;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable {
int currentRetries = retryCount.getOrDefault(action, 0);
if (currentRetries < maxRetries) {
retryCount.put(action, currentRetries + 1);
LOG.warn("Retrying failed request, attempt {} of {}", currentRetries + 1, maxRetries);
indexer.add(action);
} else {
LOG.error("Max retries exceeded for request, failing sink");
retryCount.remove(action);
throw failure;
}
}
}
// Using custom failure handler
ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setFailureHandler(new SmartFailureHandler())
.build();Pre-implemented failure handling strategies for common scenarios.
/**
* An ActionRequestFailureHandler that re-adds requests that failed due to temporary
* EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),
* and fails for all other failures.
*/
@PublicEvolving
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable;
}Usage Examples:
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
// Sink that retries on queue full, fails on other errors
ElasticsearchSink<Event> resilientSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setFailureHandler(new RetryRejectedExecutionFailureHandler())
.build();
// For Table API
CREATE TABLE resilient_table (...) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'events',
'document-type' = '_doc',
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler'
);/**
* A ActionRequestFailureHandler that simply fails the sink on any failures.
* This is the default failure handler.
*/
@Internal
public class NoOpFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable;
}Usage Examples:
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
// Explicit use of default handler (fails immediately on any error)
ElasticsearchSink<Event> strictSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setFailureHandler(new NoOpFailureHandler()) // This is the default
.build();
// Default behavior - no explicit handler needed
ElasticsearchSink<Event> defaultSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
).build(); // Uses NoOpFailureHandler by default/**
* A ActionRequestFailureHandler that ignores all failures and continues processing.
* Warning: This can lead to data loss as failed requests are dropped.
*/
@Internal
public class IgnoringFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer
) throws Throwable;
}Usage Examples:
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
// Sink that drops all failed requests (use with caution!)
ElasticsearchSink<Event> lenientSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setFailureHandler(new IgnoringFailureHandler())
.build();
// For Table API - useful for non-critical data
CREATE TABLE lenient_table (...) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'optional_events',
'document-type' = '_doc',
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'
);public class ConditionalRetryHandler implements ActionRequestFailureHandler {
private final Set<String> retryableIndices;
private final ActionRequestFailureHandler fallbackHandler;
public ConditionalRetryHandler(Set<String> retryableIndices, ActionRequestFailureHandler fallbackHandler) {
this.retryableIndices = retryableIndices;
this.fallbackHandler = fallbackHandler;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable {
String targetIndex = extractIndexFromRequest(action);
// Only retry for specific indices
if (retryableIndices.contains(targetIndex) &&
ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
LOG.info("Retrying rejected request for index: {}", targetIndex);
indexer.add(action);
} else {
// Delegate to fallback handler
fallbackHandler.onFailure(action, failure, restStatusCode, indexer);
}
}
private String extractIndexFromRequest(ActionRequest request) {
if (request instanceof IndexRequest) {
return ((IndexRequest) request).index();
} else if (request instanceof UpdateRequest) {
return ((UpdateRequest) request).index();
} else if (request instanceof DeleteRequest) {
return ((DeleteRequest) request).index();
}
return "unknown";
}
}
// Usage
Set<String> criticalIndices = Set.of("critical-events", "important-logs");
ElasticsearchSink<Event> conditionalSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setFailureHandler(new ConditionalRetryHandler(
criticalIndices,
new NoOpFailureHandler() // Fail fast for non-critical indices
))
.build();public class DeadLetterQueueHandler implements ActionRequestFailureHandler {
private final String deadLetterIndex;
private final ActionRequestFailureHandler baseHandler;
public DeadLetterQueueHandler(String deadLetterIndex, ActionRequestFailureHandler baseHandler) {
this.deadLetterIndex = deadLetterIndex;
this.baseHandler = baseHandler;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable {
try {
// Try the base handler first
baseHandler.onFailure(action, failure, restStatusCode, indexer);
} catch (Throwable t) {
// If base handler fails, send to dead letter queue
LOG.warn("Sending failed request to dead letter queue: {}", deadLetterIndex);
Map<String, Object> deadLetterDoc = new HashMap<>();
deadLetterDoc.put("original_index", extractIndexFromRequest(action));
deadLetterDoc.put("failure_reason", failure.getMessage());
deadLetterDoc.put("failure_time", System.currentTimeMillis());
deadLetterDoc.put("rest_status_code", restStatusCode);
deadLetterDoc.put("original_request", action.toString());
IndexRequest deadLetterRequest = Requests.indexRequest()
.index(deadLetterIndex)
.type("_doc")
.source(deadLetterDoc);
indexer.add(deadLetterRequest);
}
}
}
// Usage
ElasticsearchSink<Event> deadLetterSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setFailureHandler(new DeadLetterQueueHandler(
"failed-requests",
new RetryRejectedExecutionFailureHandler()
))
.build();// Common network-related failures
if (restStatusCode == -1 || restStatusCode == 408 || restStatusCode == 503) {
// Connection timeout, service unavailable
// Strategy: Retry with backoff
indexer.add(action);
}// Client errors - usually permanent
if (restStatusCode >= 400 && restStatusCode < 500) {
switch (restStatusCode) {
case 400: // Bad Request - malformed document
case 404: // Not Found - index doesn't exist
case 409: // Conflict - version conflict
LOG.error("Client error ({}), dropping request", restStatusCode);
return; // Drop request
default:
throw failure; // Fail sink
}
}// Server errors - may be temporary
if (restStatusCode >= 500) {
switch (restStatusCode) {
case 502: // Bad Gateway
case 503: // Service Unavailable
case 504: // Gateway Timeout
// Temporary server issues - retry
indexer.add(action);
break;
default:
// Serious server errors - fail fast
throw failure;
}
}// Queue saturation
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action); // Retry
}
// Parse errors
if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
return; // Drop malformed documents
}
// Security errors
if (ExceptionUtils.findThrowable(failure, ElasticsearchSecurityException.class).isPresent()) {
throw failure; // Fail - likely a configuration issue
}