0
# Failure Handling
1
2
Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.
3
4
## Capabilities
5
6
### ActionRequestFailureHandler Interface
7
8
Main interface for implementing custom failure handling strategies.
9
10
```java { .api }
11
/**
12
* An implementation of ActionRequestFailureHandler is provided by the user to define how
13
* failed ActionRequests should be handled, e.g. dropping them, reprocessing
14
* malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure
15
* is only temporary.
16
*/
17
@PublicEvolving
18
public interface ActionRequestFailureHandler extends Serializable {
19
/**
20
* Handle a failed ActionRequest.
21
* @param action the ActionRequest that failed due to the failure
22
* @param failure the cause of failure
23
* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
24
* @param indexer request indexer to re-add the failed action, if intended to do so
25
* @throws Throwable if the sink should fail on this failure, the implementation should rethrow
26
* the exception or a custom one
27
*/
28
void onFailure(
29
ActionRequest action,
30
Throwable failure,
31
int restStatusCode,
32
RequestIndexer indexer
33
) throws Throwable;
34
}
35
```
36
37
**Usage Examples:**
38
39
```java
40
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
41
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
42
import org.apache.flink.util.ExceptionUtils;
43
import org.elasticsearch.action.ActionRequest;
44
import org.elasticsearch.ElasticsearchParseException;
45
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
46
47
// Custom failure handler with different strategies
48
public class SmartFailureHandler implements ActionRequestFailureHandler {
49
private static final Logger LOG = LoggerFactory.getLogger(SmartFailureHandler.class);
50
51
@Override
52
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
53
throws Throwable {
54
55
LOG.error("Failed Elasticsearch request: {} (status: {})", failure.getMessage(), restStatusCode, failure);
56
57
// Handle queue saturation - retry the request
58
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
59
LOG.warn("Elasticsearch queue full, retrying request");
60
indexer.add(action);
61
return;
62
}
63
64
// Handle malformed documents - log and drop
65
if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
66
LOG.error("Malformed document, dropping request: {}", action);
67
return; // Drop the request without failing the sink
68
}
69
70
// Handle timeout or connection issues - limited retries
71
if (restStatusCode == 408 || restStatusCode == -1) {
72
// Could implement retry counter logic here
73
LOG.warn("Connection issue, retrying request once");
74
indexer.add(action);
75
return;
76
}
77
78
// Handle client errors (4xx) - log and drop
79
if (restStatusCode >= 400 && restStatusCode < 500) {
80
LOG.error("Client error ({}), dropping request: {}", restStatusCode, action);
81
return;
82
}
83
84
// Handle server errors (5xx) - fail fast
85
if (restStatusCode >= 500) {
86
LOG.error("Server error ({}), failing sink", restStatusCode);
87
throw failure;
88
}
89
90
// For all other failures, fail the sink
91
throw failure;
92
}
93
}
94
95
// Simple retry-all handler
96
public class RetryAllFailureHandler implements ActionRequestFailureHandler {
97
private final int maxRetries;
98
private final Map<ActionRequest, Integer> retryCount = new HashMap<>();
99
100
public RetryAllFailureHandler(int maxRetries) {
101
this.maxRetries = maxRetries;
102
}
103
104
@Override
105
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
106
throws Throwable {
107
int currentRetries = retryCount.getOrDefault(action, 0);
108
109
if (currentRetries < maxRetries) {
110
retryCount.put(action, currentRetries + 1);
111
LOG.warn("Retrying failed request, attempt {} of {}", currentRetries + 1, maxRetries);
112
indexer.add(action);
113
} else {
114
LOG.error("Max retries exceeded for request, failing sink");
115
retryCount.remove(action);
116
throw failure;
117
}
118
}
119
}
120
121
// Using custom failure handler
122
ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(
123
httpHosts,
124
sinkFunction
125
)
126
.setFailureHandler(new SmartFailureHandler())
127
.build();
128
```
129
130
### Built-in Failure Handlers
131
132
Pre-implemented failure handling strategies for common scenarios.
133
134
#### RetryRejectedExecutionFailureHandler
135
136
```java { .api }
137
/**
138
* An ActionRequestFailureHandler that re-adds requests that failed due to temporary
139
* EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),
140
* and fails for all other failures.
141
*/
142
@PublicEvolving
143
public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
144
@Override
145
public void onFailure(
146
ActionRequest action,
147
Throwable failure,
148
int restStatusCode,
149
RequestIndexer indexer
150
) throws Throwable;
151
}
152
```
153
154
**Usage Examples:**
155
156
```java
157
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
158
159
// Sink that retries on queue full, fails on other errors
160
ElasticsearchSink<Event> resilientSink = new ElasticsearchSink.Builder<>(
161
httpHosts,
162
sinkFunction
163
)
164
.setFailureHandler(new RetryRejectedExecutionFailureHandler())
165
.build();
166
167
// For Table API
168
CREATE TABLE resilient_table (...) WITH (
169
'connector' = 'elasticsearch-6',
170
'hosts' = 'http://localhost:9200',
171
'index' = 'events',
172
'document-type' = '_doc',
173
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler'
174
);
175
```
176
177
#### NoOpFailureHandler
178
179
```java { .api }
180
/**
181
* A ActionRequestFailureHandler that simply fails the sink on any failures.
182
* This is the default failure handler.
183
*/
184
@Internal
185
public class NoOpFailureHandler implements ActionRequestFailureHandler {
186
@Override
187
public void onFailure(
188
ActionRequest action,
189
Throwable failure,
190
int restStatusCode,
191
RequestIndexer indexer
192
) throws Throwable;
193
}
194
```
195
196
**Usage Examples:**
197
198
```java
199
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
200
201
// Explicit use of default handler (fails immediately on any error)
202
ElasticsearchSink<Event> strictSink = new ElasticsearchSink.Builder<>(
203
httpHosts,
204
sinkFunction
205
)
206
.setFailureHandler(new NoOpFailureHandler()) // This is the default
207
.build();
208
209
// Default behavior - no explicit handler needed
210
ElasticsearchSink<Event> defaultSink = new ElasticsearchSink.Builder<>(
211
httpHosts,
212
sinkFunction
213
).build(); // Uses NoOpFailureHandler by default
214
```
215
216
#### IgnoringFailureHandler
217
218
```java { .api }
219
/**
220
* A ActionRequestFailureHandler that ignores all failures and continues processing.
221
* Warning: This can lead to data loss as failed requests are dropped.
222
*/
223
@Internal
224
public class IgnoringFailureHandler implements ActionRequestFailureHandler {
225
@Override
226
public void onFailure(
227
ActionRequest action,
228
Throwable failure,
229
int restStatusCode,
230
RequestIndexer indexer
231
) throws Throwable;
232
}
233
```
234
235
**Usage Examples:**
236
237
```java
238
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
239
240
// Sink that drops all failed requests (use with caution!)
241
ElasticsearchSink<Event> lenientSink = new ElasticsearchSink.Builder<>(
242
httpHosts,
243
sinkFunction
244
)
245
.setFailureHandler(new IgnoringFailureHandler())
246
.build();
247
248
// For Table API - useful for non-critical data
249
CREATE TABLE lenient_table (...) WITH (
250
'connector' = 'elasticsearch-6',
251
'hosts' = 'http://localhost:9200',
252
'index' = 'optional_events',
253
'document-type' = '_doc',
254
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'
255
);
256
```
257
258
### Advanced Failure Handling Patterns
259
260
#### Conditional Retry Handler
261
262
```java
263
public class ConditionalRetryHandler implements ActionRequestFailureHandler {
264
private final Set<String> retryableIndices;
265
private final ActionRequestFailureHandler fallbackHandler;
266
267
public ConditionalRetryHandler(Set<String> retryableIndices, ActionRequestFailureHandler fallbackHandler) {
268
this.retryableIndices = retryableIndices;
269
this.fallbackHandler = fallbackHandler;
270
}
271
272
@Override
273
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
274
throws Throwable {
275
276
String targetIndex = extractIndexFromRequest(action);
277
278
// Only retry for specific indices
279
if (retryableIndices.contains(targetIndex) &&
280
ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
281
LOG.info("Retrying rejected request for index: {}", targetIndex);
282
indexer.add(action);
283
} else {
284
// Delegate to fallback handler
285
fallbackHandler.onFailure(action, failure, restStatusCode, indexer);
286
}
287
}
288
289
private String extractIndexFromRequest(ActionRequest request) {
290
if (request instanceof IndexRequest) {
291
return ((IndexRequest) request).index();
292
} else if (request instanceof UpdateRequest) {
293
return ((UpdateRequest) request).index();
294
} else if (request instanceof DeleteRequest) {
295
return ((DeleteRequest) request).index();
296
}
297
return "unknown";
298
}
299
}
300
301
// Usage
302
Set<String> criticalIndices = Set.of("critical-events", "important-logs");
303
ElasticsearchSink<Event> conditionalSink = new ElasticsearchSink.Builder<>(
304
httpHosts,
305
sinkFunction
306
)
307
.setFailureHandler(new ConditionalRetryHandler(
308
criticalIndices,
309
new NoOpFailureHandler() // Fail fast for non-critical indices
310
))
311
.build();
312
```
313
314
#### Dead Letter Queue Handler
315
316
```java
317
public class DeadLetterQueueHandler implements ActionRequestFailureHandler {
318
private final String deadLetterIndex;
319
private final ActionRequestFailureHandler baseHandler;
320
321
public DeadLetterQueueHandler(String deadLetterIndex, ActionRequestFailureHandler baseHandler) {
322
this.deadLetterIndex = deadLetterIndex;
323
this.baseHandler = baseHandler;
324
}
325
326
@Override
327
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
328
throws Throwable {
329
330
try {
331
// Try the base handler first
332
baseHandler.onFailure(action, failure, restStatusCode, indexer);
333
} catch (Throwable t) {
334
// If base handler fails, send to dead letter queue
335
LOG.warn("Sending failed request to dead letter queue: {}", deadLetterIndex);
336
337
Map<String, Object> deadLetterDoc = new HashMap<>();
338
deadLetterDoc.put("original_index", extractIndexFromRequest(action));
339
deadLetterDoc.put("failure_reason", failure.getMessage());
340
deadLetterDoc.put("failure_time", System.currentTimeMillis());
341
deadLetterDoc.put("rest_status_code", restStatusCode);
342
deadLetterDoc.put("original_request", action.toString());
343
344
IndexRequest deadLetterRequest = Requests.indexRequest()
345
.index(deadLetterIndex)
346
.type("_doc")
347
.source(deadLetterDoc);
348
349
indexer.add(deadLetterRequest);
350
}
351
}
352
}
353
354
// Usage
355
ElasticsearchSink<Event> deadLetterSink = new ElasticsearchSink.Builder<>(
356
httpHosts,
357
sinkFunction
358
)
359
.setFailureHandler(new DeadLetterQueueHandler(
360
"failed-requests",
361
new RetryRejectedExecutionFailureHandler()
362
))
363
.build();
364
```
365
366
### Error Categories and Handling Strategies
367
368
#### Network and Connection Errors
369
370
```java
371
// Common network-related failures
372
if (restStatusCode == -1 || restStatusCode == 408 || restStatusCode == 503) {
373
// Connection timeout, service unavailable
374
// Strategy: Retry with backoff
375
indexer.add(action);
376
}
377
```
378
379
#### Client Errors (4xx)
380
381
```java
382
// Client errors - usually permanent
383
if (restStatusCode >= 400 && restStatusCode < 500) {
384
switch (restStatusCode) {
385
case 400: // Bad Request - malformed document
386
case 404: // Not Found - index doesn't exist
387
case 409: // Conflict - version conflict
388
LOG.error("Client error ({}), dropping request", restStatusCode);
389
return; // Drop request
390
default:
391
throw failure; // Fail sink
392
}
393
}
394
```
395
396
#### Server Errors (5xx)
397
398
```java
399
// Server errors - may be temporary
400
if (restStatusCode >= 500) {
401
switch (restStatusCode) {
402
case 502: // Bad Gateway
403
case 503: // Service Unavailable
404
case 504: // Gateway Timeout
405
// Temporary server issues - retry
406
indexer.add(action);
407
break;
408
default:
409
// Serious server errors - fail fast
410
throw failure;
411
}
412
}
413
```
414
415
#### Elasticsearch-Specific Errors
416
417
```java
418
// Queue saturation
419
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
420
indexer.add(action); // Retry
421
}
422
423
// Parse errors
424
if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
425
return; // Drop malformed documents
426
}
427
428
// Security errors
429
if (ExceptionUtils.findThrowable(failure, ElasticsearchSecurityException.class).isPresent()) {
430
throw failure; // Fail - likely a configuration issue
431
}
432
```