or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md

failure-handling.mddocs/

0

# Failure Handling

1

2

Pluggable failure handling system with built-in handlers and support for custom implementations. Provides different strategies for handling request failures, network issues, and cluster rejections.

3

4

## Capabilities

5

6

### ActionRequestFailureHandler Interface

7

8

Main interface for implementing custom failure handling strategies.

9

10

```java { .api }

11

/**

12

* An implementation of ActionRequestFailureHandler is provided by the user to define how

13

* failed ActionRequests should be handled, e.g. dropping them, reprocessing

14

* malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure

15

* is only temporary.

16

*/

17

@PublicEvolving

18

public interface ActionRequestFailureHandler extends Serializable {

19

/**

20

* Handle a failed ActionRequest.

21

* @param action the ActionRequest that failed due to the failure

22

* @param failure the cause of failure

23

* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)

24

* @param indexer request indexer to re-add the failed action, if intended to do so

25

* @throws Throwable if the sink should fail on this failure, the implementation should rethrow

26

* the exception or a custom one

27

*/

28

void onFailure(

29

ActionRequest action,

30

Throwable failure,

31

int restStatusCode,

32

RequestIndexer indexer

33

) throws Throwable;

34

}

35

```

36

37

**Usage Examples:**

38

39

```java

40

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;

41

import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

42

import org.apache.flink.util.ExceptionUtils;

43

import org.elasticsearch.action.ActionRequest;

44

import org.elasticsearch.ElasticsearchParseException;

45

import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

46

47

// Custom failure handler with different strategies

48

public class SmartFailureHandler implements ActionRequestFailureHandler {

49

private static final Logger LOG = LoggerFactory.getLogger(SmartFailureHandler.class);

50

51

@Override

52

public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)

53

throws Throwable {

54

55

LOG.error("Failed Elasticsearch request: {} (status: {})", failure.getMessage(), restStatusCode, failure);

56

57

// Handle queue saturation - retry the request

58

if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {

59

LOG.warn("Elasticsearch queue full, retrying request");

60

indexer.add(action);

61

return;

62

}

63

64

// Handle malformed documents - log and drop

65

if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {

66

LOG.error("Malformed document, dropping request: {}", action);

67

return; // Drop the request without failing the sink

68

}

69

70

// Handle timeout or connection issues - limited retries

71

if (restStatusCode == 408 || restStatusCode == -1) {

72

// Could implement retry counter logic here

73

LOG.warn("Connection issue, retrying request once");

74

indexer.add(action);

75

return;

76

}

77

78

// Handle client errors (4xx) - log and drop

79

if (restStatusCode >= 400 && restStatusCode < 500) {

80

LOG.error("Client error ({}), dropping request: {}", restStatusCode, action);

81

return;

82

}

83

84

// Handle server errors (5xx) - fail fast

85

if (restStatusCode >= 500) {

86

LOG.error("Server error ({}), failing sink", restStatusCode);

87

throw failure;

88

}

89

90

// For all other failures, fail the sink

91

throw failure;

92

}

93

}

94

95

// Simple retry-all handler

96

public class RetryAllFailureHandler implements ActionRequestFailureHandler {

97

private final int maxRetries;

98

private final Map<ActionRequest, Integer> retryCount = new HashMap<>();

99

100

public RetryAllFailureHandler(int maxRetries) {

101

this.maxRetries = maxRetries;

102

}

103

104

@Override

105

public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)

106

throws Throwable {

107

int currentRetries = retryCount.getOrDefault(action, 0);

108

109

if (currentRetries < maxRetries) {

110

retryCount.put(action, currentRetries + 1);

111

LOG.warn("Retrying failed request, attempt {} of {}", currentRetries + 1, maxRetries);

112

indexer.add(action);

113

} else {

114

LOG.error("Max retries exceeded for request, failing sink");

115

retryCount.remove(action);

116

throw failure;

117

}

118

}

119

}

120

121

// Using custom failure handler

122

ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(

123

httpHosts,

124

sinkFunction

125

)

126

.setFailureHandler(new SmartFailureHandler())

127

.build();

128

```

129

130

### Built-in Failure Handlers

131

132

Pre-implemented failure handling strategies for common scenarios.

133

134

#### RetryRejectedExecutionFailureHandler

135

136

```java { .api }

137

/**

138

* An ActionRequestFailureHandler that re-adds requests that failed due to temporary

139

* EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),

140

* and fails for all other failures.

141

*/

142

@PublicEvolving

143

public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {

144

@Override

145

public void onFailure(

146

ActionRequest action,

147

Throwable failure,

148

int restStatusCode,

149

RequestIndexer indexer

150

) throws Throwable;

151

}

152

```

153

154

**Usage Examples:**

155

156

```java

157

import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;

158

159

// Sink that retries on queue full, fails on other errors

160

ElasticsearchSink<Event> resilientSink = new ElasticsearchSink.Builder<>(

161

httpHosts,

162

sinkFunction

163

)

164

.setFailureHandler(new RetryRejectedExecutionFailureHandler())

165

.build();

166

167

// For Table API

168

CREATE TABLE resilient_table (...) WITH (

169

'connector' = 'elasticsearch-6',

170

'hosts' = 'http://localhost:9200',

171

'index' = 'events',

172

'document-type' = '_doc',

173

'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler'

174

);

175

```

176

177

#### NoOpFailureHandler

178

179

```java { .api }

180

/**

181

* A ActionRequestFailureHandler that simply fails the sink on any failures.

182

* This is the default failure handler.

183

*/

184

@Internal

185

public class NoOpFailureHandler implements ActionRequestFailureHandler {

186

@Override

187

public void onFailure(

188

ActionRequest action,

189

Throwable failure,

190

int restStatusCode,

191

RequestIndexer indexer

192

) throws Throwable;

193

}

194

```

195

196

**Usage Examples:**

197

198

```java

199

import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;

200

201

// Explicit use of default handler (fails immediately on any error)

202

ElasticsearchSink<Event> strictSink = new ElasticsearchSink.Builder<>(

203

httpHosts,

204

sinkFunction

205

)

206

.setFailureHandler(new NoOpFailureHandler()) // This is the default

207

.build();

208

209

// Default behavior - no explicit handler needed

210

ElasticsearchSink<Event> defaultSink = new ElasticsearchSink.Builder<>(

211

httpHosts,

212

sinkFunction

213

).build(); // Uses NoOpFailureHandler by default

214

```

215

216

#### IgnoringFailureHandler

217

218

```java { .api }

219

/**

220

* A ActionRequestFailureHandler that ignores all failures and continues processing.

221

* Warning: This can lead to data loss as failed requests are dropped.

222

*/

223

@Internal

224

public class IgnoringFailureHandler implements ActionRequestFailureHandler {

225

@Override

226

public void onFailure(

227

ActionRequest action,

228

Throwable failure,

229

int restStatusCode,

230

RequestIndexer indexer

231

) throws Throwable;

232

}

233

```

234

235

**Usage Examples:**

236

237

```java

238

import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;

239

240

// Sink that drops all failed requests (use with caution!)

241

ElasticsearchSink<Event> lenientSink = new ElasticsearchSink.Builder<>(

242

httpHosts,

243

sinkFunction

244

)

245

.setFailureHandler(new IgnoringFailureHandler())

246

.build();

247

248

// For Table API - useful for non-critical data

249

CREATE TABLE lenient_table (...) WITH (

250

'connector' = 'elasticsearch-6',

251

'hosts' = 'http://localhost:9200',

252

'index' = 'optional_events',

253

'document-type' = '_doc',

254

'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'

255

);

256

```

257

258

### Advanced Failure Handling Patterns

259

260

#### Conditional Retry Handler

261

262

```java

263

public class ConditionalRetryHandler implements ActionRequestFailureHandler {

264

private final Set<String> retryableIndices;

265

private final ActionRequestFailureHandler fallbackHandler;

266

267

public ConditionalRetryHandler(Set<String> retryableIndices, ActionRequestFailureHandler fallbackHandler) {

268

this.retryableIndices = retryableIndices;

269

this.fallbackHandler = fallbackHandler;

270

}

271

272

@Override

273

public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)

274

throws Throwable {

275

276

String targetIndex = extractIndexFromRequest(action);

277

278

// Only retry for specific indices

279

if (retryableIndices.contains(targetIndex) &&

280

ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {

281

LOG.info("Retrying rejected request for index: {}", targetIndex);

282

indexer.add(action);

283

} else {

284

// Delegate to fallback handler

285

fallbackHandler.onFailure(action, failure, restStatusCode, indexer);

286

}

287

}

288

289

private String extractIndexFromRequest(ActionRequest request) {

290

if (request instanceof IndexRequest) {

291

return ((IndexRequest) request).index();

292

} else if (request instanceof UpdateRequest) {

293

return ((UpdateRequest) request).index();

294

} else if (request instanceof DeleteRequest) {

295

return ((DeleteRequest) request).index();

296

}

297

return "unknown";

298

}

299

}

300

301

// Usage

302

Set<String> criticalIndices = Set.of("critical-events", "important-logs");

303

ElasticsearchSink<Event> conditionalSink = new ElasticsearchSink.Builder<>(

304

httpHosts,

305

sinkFunction

306

)

307

.setFailureHandler(new ConditionalRetryHandler(

308

criticalIndices,

309

new NoOpFailureHandler() // Fail fast for non-critical indices

310

))

311

.build();

312

```

313

314

#### Dead Letter Queue Handler

315

316

```java

317

public class DeadLetterQueueHandler implements ActionRequestFailureHandler {

318

private final String deadLetterIndex;

319

private final ActionRequestFailureHandler baseHandler;

320

321

public DeadLetterQueueHandler(String deadLetterIndex, ActionRequestFailureHandler baseHandler) {

322

this.deadLetterIndex = deadLetterIndex;

323

this.baseHandler = baseHandler;

324

}

325

326

@Override

327

public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)

328

throws Throwable {

329

330

try {

331

// Try the base handler first

332

baseHandler.onFailure(action, failure, restStatusCode, indexer);

333

} catch (Throwable t) {

334

// If base handler fails, send to dead letter queue

335

LOG.warn("Sending failed request to dead letter queue: {}", deadLetterIndex);

336

337

Map<String, Object> deadLetterDoc = new HashMap<>();

338

deadLetterDoc.put("original_index", extractIndexFromRequest(action));

339

deadLetterDoc.put("failure_reason", failure.getMessage());

340

deadLetterDoc.put("failure_time", System.currentTimeMillis());

341

deadLetterDoc.put("rest_status_code", restStatusCode);

342

deadLetterDoc.put("original_request", action.toString());

343

344

IndexRequest deadLetterRequest = Requests.indexRequest()

345

.index(deadLetterIndex)

346

.type("_doc")

347

.source(deadLetterDoc);

348

349

indexer.add(deadLetterRequest);

350

}

351

}

352

}

353

354

// Usage

355

ElasticsearchSink<Event> deadLetterSink = new ElasticsearchSink.Builder<>(

356

httpHosts,

357

sinkFunction

358

)

359

.setFailureHandler(new DeadLetterQueueHandler(

360

"failed-requests",

361

new RetryRejectedExecutionFailureHandler()

362

))

363

.build();

364

```

365

366

### Error Categories and Handling Strategies

367

368

#### Network and Connection Errors

369

370

```java

371

// Common network-related failures

372

if (restStatusCode == -1 || restStatusCode == 408 || restStatusCode == 503) {

373

// Connection timeout, service unavailable

374

// Strategy: Retry with backoff

375

indexer.add(action);

376

}

377

```

378

379

#### Client Errors (4xx)

380

381

```java

382

// Client errors - usually permanent

383

if (restStatusCode >= 400 && restStatusCode < 500) {

384

switch (restStatusCode) {

385

case 400: // Bad Request - malformed document

386

case 404: // Not Found - index doesn't exist

387

case 409: // Conflict - version conflict

388

LOG.error("Client error ({}), dropping request", restStatusCode);

389

return; // Drop request

390

default:

391

throw failure; // Fail sink

392

}

393

}

394

```

395

396

#### Server Errors (5xx)

397

398

```java

399

// Server errors - may be temporary

400

if (restStatusCode >= 500) {

401

switch (restStatusCode) {

402

case 502: // Bad Gateway

403

case 503: // Service Unavailable

404

case 504: // Gateway Timeout

405

// Temporary server issues - retry

406

indexer.add(action);

407

break;

408

default:

409

// Serious server errors - fail fast

410

throw failure;

411

}

412

}

413

```

414

415

#### Elasticsearch-Specific Errors

416

417

```java

418

// Queue saturation

419

if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {

420

indexer.add(action); // Retry

421

}

422

423

// Parse errors

424

if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {

425

return; // Drop malformed documents

426

}

427

428

// Security errors

429

if (ExceptionUtils.findThrowable(failure, ElasticsearchSecurityException.class).isPresent()) {

430

throw failure; // Fail - likely a configuration issue

431

}

432

```