or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mderror-handling.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

error-handling.mddocs/

0

# Error Handling

1

2

Comprehensive error handling and recovery mechanisms in RxJava. Error handling is crucial for building robust reactive applications that can gracefully handle failures and recover from error conditions.

3

4

## Capabilities

5

6

### Basic Error Handling Operators

7

8

Operators available on all reactive types for handling errors.

9

10

```java { .api }

11

/**

12

* Returns a reactive stream that emits a fallback value when the source emits an error

13

*/

14

public final T onErrorReturn(Function<? super Throwable, ? extends R> valueSupplier);

15

public final T onErrorReturn(R value);

16

17

/**

18

* Returns a reactive stream that switches to another stream when the source emits an error

19

*/

20

public final T onErrorResumeNext(Function<? super Throwable, ? extends T> resumeFunction);

21

public final T onErrorResumeNext(T resumeStream);

22

23

/**

24

* Re-subscribes to the source stream when an error occurs

25

*/

26

public final T retry();

27

public final T retry(long times);

28

public final T retry(BiPredicate<? super Integer, ? super Throwable> predicate);

29

30

/**

31

* Re-subscribes based on a function that receives error notifications

32

*/

33

public final T retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler);

34

35

/**

36

* Performs an action when an error occurs without affecting the stream

37

*/

38

public final T doOnError(Consumer<? super Throwable> onError);

39

40

/**

41

* Materializes error notifications as regular onNext emissions

42

*/

43

public final Observable<Notification<T>> materialize();

44

```

45

46

### Completable-Specific Error Handling

47

48

Additional error handling for Completable operations.

49

50

```java { .api }

51

/**

52

* Converts errors to successful completion

53

*/

54

public final Completable onErrorComplete();

55

public final Completable onErrorComplete(Predicate<? super Throwable> predicate);

56

```

57

58

### Maybe-Specific Error Handling

59

60

Additional error handling for Maybe operations.

61

62

```java { .api }

63

/**

64

* Converts errors to empty completion

65

*/

66

public final Maybe<T> onErrorComplete();

67

public final Maybe<T> onErrorComplete(Predicate<? super Throwable> predicate);

68

```

69

70

### Global Error Handling

71

72

Global hooks for undeliverable errors and general error handling.

73

74

```java { .api }

75

/**

76

* Global plugin system for error handling

77

*/

78

public final class RxJavaPlugins {

79

/**

80

* Sets a global error handler for undeliverable exceptions

81

*/

82

public static void setErrorHandler(Consumer<? super Throwable> handler);

83

84

/**

85

* Gets the current global error handler

86

*/

87

public static Consumer<? super Throwable> getErrorHandler();

88

89

/**

90

* Called when an error cannot be delivered to observers

91

*/

92

public static void onError(Throwable error);

93

}

94

```

95

96

### Exception Types

97

98

Common exception types used in RxJava.

99

100

```java { .api }

101

/**

102

* Thrown when multiple exceptions occur

103

*/

104

public final class CompositeException extends RuntimeException {

105

/**

106

* Returns the list of suppressed exceptions

107

*/

108

public List<Throwable> getExceptions();

109

110

/**

111

* Returns the number of suppressed exceptions

112

*/

113

public int size();

114

}

115

116

/**

117

* Thrown when onError is called but no error handler is provided

118

*/

119

public final class OnErrorNotImplementedException extends RuntimeException {

120

// Standard exception wrapper

121

}

122

123

/**

124

* Thrown when backpressure buffer overflows

125

*/

126

public final class MissingBackpressureException extends RuntimeException {

127

// Backpressure-related exception

128

}

129

130

/**

131

* Thrown when the Reactive Streams protocol is violated

132

*/

133

public final class ProtocolViolationException extends IllegalStateException {

134

// Protocol violation exception

135

}

136

137

/**

138

* Wrapper for exceptions that couldn't be delivered to downstream

139

*/

140

public final class UndeliverableException extends RuntimeException {

141

// Undeliverable exception wrapper

142

}

143

```

144

145

## Usage Examples

146

147

**Basic Error Recovery with onErrorReturn:**

148

149

```java

150

import io.reactivex.Observable;

151

152

Observable<String> riskyOperation = Observable.fromCallable(() -> {

153

if (Math.random() > 0.5) {

154

throw new RuntimeException("Random failure");

155

}

156

return "Success";

157

});

158

159

// Provide fallback value on error

160

riskyOperation

161

.onErrorReturn(throwable -> {

162

System.out.println("Error occurred: " + throwable.getMessage());

163

return "Fallback Value";

164

})

165

.subscribe(result -> System.out.println("Result: " + result));

166

167

// Simple fallback value

168

riskyOperation

169

.onErrorReturn("Default Value")

170

.subscribe(result -> System.out.println("Result: " + result));

171

```

172

173

**Error Recovery with Alternative Stream:**

174

175

```java

176

Observable<String> primarySource = Observable.fromCallable(() -> {

177

throw new RuntimeException("Primary source failed");

178

});

179

180

Observable<String> fallbackSource = Observable.just("Fallback", "Data");

181

182

// Switch to fallback stream on error

183

primarySource

184

.onErrorResumeNext(throwable -> {

185

System.out.println("Primary failed, using fallback: " + throwable.getMessage());

186

return fallbackSource;

187

})

188

.subscribe(

189

item -> System.out.println("Item: " + item),

190

error -> System.err.println("Final error: " + error)

191

);

192

```

193

194

**Retry Strategies:**

195

196

```java

197

Observable<String> unreliableService = Observable.fromCallable(() -> {

198

if (Math.random() > 0.7) {

199

return "Success";

200

}

201

throw new RuntimeException("Service temporarily unavailable");

202

});

203

204

// Simple retry (infinite)

205

unreliableService

206

.retry()

207

.take(1) // Take first success

208

.subscribe(

209

result -> System.out.println("Got result: " + result),

210

error -> System.err.println("Never succeeded: " + error)

211

);

212

213

// Retry limited times

214

unreliableService

215

.retry(3)

216

.subscribe(

217

result -> System.out.println("Success after retries: " + result),

218

error -> System.err.println("Failed after 3 retries: " + error)

219

);

220

221

// Conditional retry

222

unreliableService

223

.retry((retryCount, throwable) -> {

224

System.out.println("Retry attempt " + retryCount + " for: " + throwable.getMessage());

225

return retryCount < 5 && throwable instanceof RuntimeException;

226

})

227

.subscribe(

228

result -> System.out.println("Conditional retry success: " + result),

229

error -> System.err.println("Conditional retry failed: " + error)

230

);

231

```

232

233

**Advanced Retry with Exponential Backoff:**

234

235

```java

236

import java.util.concurrent.TimeUnit;

237

238

Observable<String> apiCall = Observable.fromCallable(() -> {

239

// Simulate API that fails 80% of the time

240

if (Math.random() > 0.2) {

241

throw new RuntimeException("API Error");

242

}

243

return "API Response";

244

});

245

246

// Retry with exponential backoff

247

apiCall

248

.retryWhen(errors ->

249

errors

250

.zipWith(Observable.range(1, 4), (throwable, attempt) -> {

251

System.out.println("Attempt " + attempt + " failed: " + throwable.getMessage());

252

return attempt;

253

})

254

.flatMap(attempt -> {

255

long delay = (long) Math.pow(2, attempt); // Exponential backoff

256

System.out.println("Retrying in " + delay + " seconds...");

257

return Observable.timer(delay, TimeUnit.SECONDS);

258

})

259

)

260

.subscribe(

261

result -> System.out.println("API Success: " + result),

262

error -> System.err.println("API Failed after all retries: " + error)

263

);

264

```

265

266

**Error Handling in Chains:**

267

268

```java

269

Observable.fromCallable(() -> "input")

270

.map(input -> {

271

if (input.equals("input")) {

272

throw new IllegalArgumentException("Invalid input");

273

}

274

return input.toUpperCase();

275

})

276

.flatMap(processed -> Observable.fromCallable(() -> {

277

if (processed.equals("ERROR")) {

278

throw new RuntimeException("Processing failed");

279

}

280

return "Processed: " + processed;

281

}))

282

.onErrorResumeNext(throwable -> {

283

if (throwable instanceof IllegalArgumentException) {

284

return Observable.just("Input validation failed");

285

} else if (throwable instanceof RuntimeException) {

286

return Observable.just("Processing failed, using default");

287

}

288

return Observable.error(throwable); // Re-throw unknown errors

289

})

290

.subscribe(

291

result -> System.out.println("Final result: " + result),

292

error -> System.err.println("Unhandled error: " + error)

293

);

294

```

295

296

**Side-Effect Error Logging:**

297

298

```java

299

Observable<Integer> source = Observable.range(1, 10)

300

.map(i -> {

301

if (i == 5) {

302

throw new RuntimeException("Error at item " + i);

303

}

304

return i * i;

305

});

306

307

source

308

.doOnError(throwable -> {

309

// Log error without affecting stream

310

System.err.println("Logging error: " + throwable.getMessage());

311

// Could also log to file, send to monitoring system, etc.

312

})

313

.onErrorReturn(-1) // Recover after logging

314

.subscribe(

315

value -> System.out.println("Value: " + value),

316

error -> System.err.println("Final error: " + error) // Won't be called due to onErrorReturn

317

);

318

```

319

320

**Error Materialization:**

321

322

```java

323

Observable<String> source = Observable.just("A", "B")

324

.concatWith(Observable.error(new RuntimeException("Error")))

325

.concatWith(Observable.just("C")); // This won't be reached

326

327

// Materialize errors as regular notifications

328

source

329

.materialize()

330

.subscribe(notification -> {

331

if (notification.isOnNext()) {

332

System.out.println("Value: " + notification.getValue());

333

} else if (notification.isOnError()) {

334

System.out.println("Error: " + notification.getError().getMessage());

335

} else if (notification.isOnComplete()) {

336

System.out.println("Completed");

337

}

338

});

339

340

// Dematerialize back to regular stream (optional)

341

source

342

.materialize()

343

.filter(notification -> !notification.isOnError()) // Skip errors

344

.dematerialize(notification -> notification)

345

.subscribe(

346

value -> System.out.println("Filtered value: " + value),

347

error -> System.err.println("This won't be called"),

348

() -> System.out.println("Completed without errors")

349

);

350

```

351

352

**Completable Error Handling:**

353

354

```java

355

Completable riskyOperation = Completable.fromAction(() -> {

356

if (Math.random() > 0.5) {

357

throw new RuntimeException("Operation failed");

358

}

359

System.out.println("Operation succeeded");

360

});

361

362

// Convert error to completion

363

riskyOperation

364

.onErrorComplete()

365

.subscribe(

366

() -> System.out.println("Completed (success or error converted)"),

367

error -> System.err.println("This won't be called")

368

);

369

370

// Conditional error to completion

371

riskyOperation

372

.onErrorComplete(throwable -> throwable instanceof RuntimeException)

373

.subscribe(

374

() -> System.out.println("Completed (RuntimeException converted)"),

375

error -> System.err.println("Non-RuntimeException: " + error)

376

);

377

378

// Resume with another Completable

379

Completable fallbackOperation = Completable.fromAction(() ->

380

System.out.println("Fallback operation executed"));

381

382

riskyOperation

383

.onErrorResumeNext(throwable -> {

384

System.out.println("Primary failed, running fallback: " + throwable.getMessage());

385

return fallbackOperation;

386

})

387

.subscribe(

388

() -> System.out.println("Some operation completed"),

389

error -> System.err.println("Both operations failed: " + error)

390

);

391

```

392

393

**Global Error Handling:**

394

395

```java

396

import io.reactivex.plugins.RxJavaPlugins;

397

398

// Set global error handler for undeliverable exceptions

399

RxJavaPlugins.setErrorHandler(throwable -> {

400

System.err.println("Undeliverable exception: " + throwable.getMessage());

401

throwable.printStackTrace();

402

403

// Could also:

404

// - Log to crash reporting service

405

// - Send to monitoring system

406

// - Write to log file

407

// - Trigger app restart in severe cases

408

});

409

410

// Example of undeliverable error

411

PublishSubject<String> subject = PublishSubject.create();

412

subject.subscribe(

413

value -> System.out.println("Value: " + value),

414

error -> System.err.println("Error: " + error)

415

);

416

417

// Complete the subject

418

subject.onComplete();

419

420

// This error cannot be delivered (subject already terminated)

421

// It will be caught by the global error handler

422

subject.onError(new RuntimeException("Undeliverable error"));

423

```

424

425

**Complex Error Handling Scenario:**

426

427

```java

428

// Multi-step process with different error handling strategies

429

Observable<String> complexProcess = Observable.fromCallable(() -> "input")

430

// Step 1: Validation with retry

431

.flatMap(input -> Observable.fromCallable(() -> validateInput(input))

432

.retry(2)

433

.onErrorResumeNext(throwable -> Observable.just("default-input")))

434

435

// Step 2: Processing with timeout and fallback

436

.flatMap(validInput -> processData(validInput)

437

.timeout(5, TimeUnit.SECONDS)

438

.onErrorReturn(throwable -> {

439

if (throwable instanceof TimeoutException) {

440

return "timeout-result";

441

}

442

return "error-result";

443

}))

444

445

// Step 3: Final transformation with error logging

446

.map(result -> result.toUpperCase())

447

.doOnError(throwable -> logError("Final step failed", throwable))

448

.onErrorReturn("FINAL-FALLBACK");

449

450

complexProcess.subscribe(

451

result -> System.out.println("Final result: " + result),

452

error -> System.err.println("Unexpected error: " + error) // Should never be called

453

);

454

455

private static String validateInput(String input) {

456

if (input == null || input.isEmpty()) {

457

throw new IllegalArgumentException("Invalid input");

458

}

459

return input;

460

}

461

462

private static Observable<String> processData(String input) {

463

return Observable.fromCallable(() -> {

464

// Simulate processing that might fail or timeout

465

Thread.sleep(3000);

466

if (Math.random() > 0.7) {

467

throw new RuntimeException("Processing failed");

468

}

469

return "processed-" + input;

470

}).subscribeOn(Schedulers.io());

471

}

472

473

private static void logError(String context, Throwable throwable) {

474

System.err.println(context + ": " + throwable.getMessage());

475

}

476

```

477

478

## Error Handling Best Practices

479

480

**Guidelines:**

481

482

1. **Always handle errors**: Never ignore errors in reactive streams

483

2. **Use appropriate operators**: Choose the right error handling operator for your use case

484

3. **Fail fast vs. resilience**: Balance between failing fast and being resilient

485

4. **Log errors**: Always log errors for debugging and monitoring

486

5. **Global handler**: Set up a global error handler for undeliverable exceptions

487

6. **Test error scenarios**: Write tests for error conditions

488

7. **Resource cleanup**: Ensure resources are cleaned up even when errors occur

489

8. **User experience**: Provide meaningful error messages to users

490

491

**Common Patterns:**

492

493

- **Retry with backoff**: For transient network errors

494

- **Fallback values**: For non-critical operations

495

- **Alternative streams**: For redundant data sources

496

- **Error conversion**: Convert errors to empty/default for optional operations

497

- **Circuit breaker**: Stop trying after consecutive failures

498

- **Timeout handling**: Set reasonable timeouts for operations

499

500

## Types

501

502

```java { .api }

503

/**

504

* Predicate for conditional operations

505

*/

506

public interface Predicate<T> {

507

boolean test(T t) throws Exception;

508

}

509

510

/**

511

* BiPredicate for retry conditions

512

*/

513

public interface BiPredicate<T1, T2> {

514

boolean test(T1 t1, T2 t2) throws Exception;

515

}

516

517

/**

518

* Function for error mapping

519

*/

520

public interface Function<T, R> {

521

R apply(T t) throws Exception;

522

}

523

524

/**

525

* Consumer for side effects

526

*/

527

public interface Consumer<T> {

528

void accept(T t) throws Exception;

529

}

530

```