0
# Completable Operations
1
2
Reactive type that only signals completion or error without emitting items. Completable is ideal for fire-and-forget operations, side effects, and operations where you only care about successful completion rather than return values.
3
4
## Capabilities
5
6
### Completable Creation
7
8
Factory methods for creating Completable instances.
9
10
```java { .api }
11
/**
12
* Creates a Completable that immediately completes
13
*/
14
public static Completable complete();
15
16
/**
17
* Creates a Completable from an Action
18
*/
19
public static Completable fromAction(Action action);
20
21
/**
22
* Creates a Completable from a Runnable
23
*/
24
public static Completable fromRunnable(Runnable runnable);
25
26
/**
27
* Creates a Completable from a Callable (ignoring the return value)
28
*/
29
public static Completable fromCallable(Callable<?> callable);
30
31
/**
32
* Creates a Completable from a Future (ignoring the result)
33
*/
34
public static Completable fromFuture(Future<?> future);
35
36
/**
37
* Creates a Completable using the provided CompletableOnSubscribe function
38
*/
39
public static Completable create(CompletableOnSubscribe source);
40
41
/**
42
* Creates a Completable that completes after a delay
43
*/
44
public static Completable timer(long delay, TimeUnit unit);
45
public static Completable timer(long delay, TimeUnit unit, Scheduler scheduler);
46
47
/**
48
* Creates a Completable that only calls onError
49
*/
50
public static Completable error(Throwable error);
51
public static Completable error(Callable<? extends Throwable> errorSupplier);
52
53
/**
54
* Creates a Completable that never completes or errors
55
*/
56
public static Completable never();
57
58
/**
59
* Defers Completable creation until subscription
60
*/
61
public static Completable defer(Callable<? extends CompletableSource> completableSupplier);
62
63
/**
64
* Creates a Completable from other reactive types (ignoring their values)
65
*/
66
public static Completable fromObservable(ObservableSource<?> observable);
67
public static Completable fromFlowable(Publisher<?> flowable);
68
public static Completable fromSingle(SingleSource<?> single);
69
public static Completable fromMaybe(MaybeSource<?> maybe);
70
```
71
72
### Transformation and Chaining
73
74
Chain Completables and transform them into other reactive types.
75
76
```java { .api }
77
/**
78
* Chains this Completable with another reactive type
79
*/
80
public final <T> Observable<T> andThen(ObservableSource<T> next);
81
public final <T> Flowable<T> andThen(Publisher<T> next);
82
public final <T> Single<T> andThen(SingleSource<T> next);
83
public final <T> Maybe<T> andThen(MaybeSource<T> next);
84
public final Completable andThen(CompletableSource next);
85
86
/**
87
* Transforms this Completable using a transformer
88
*/
89
public final <R> R as(CompletableConverter<? extends R> converter);
90
public final <R> R to(CompletableConverter<? extends R> converter);
91
public final Completable compose(CompletableTransformer transformer);
92
```
93
94
### Combining Completables
95
96
Combine multiple Completables.
97
98
```java { .api }
99
/**
100
* Merges multiple Completables (completes when all complete)
101
*/
102
public static Completable merge(CompletableSource... sources);
103
public static Completable merge(Iterable<? extends CompletableSource> sources);
104
public static Completable merge(Publisher<? extends CompletableSource> sources);
105
106
/**
107
* Merges with concurrency limit
108
*/
109
public static Completable merge(Publisher<? extends CompletableSource> sources, int maxConcurrency);
110
111
/**
112
* Concatenates Completables sequentially
113
*/
114
public static Completable concat(CompletableSource... sources);
115
public static Completable concat(Iterable<? extends CompletableSource> sources);
116
public static Completable concat(Publisher<? extends CompletableSource> sources);
117
118
/**
119
* Returns the first Completable to complete
120
*/
121
public static Completable amb(CompletableSource... sources);
122
public static Completable amb(Iterable<? extends CompletableSource> sources);
123
124
/**
125
* Concatenates this Completable with others
126
*/
127
public final Completable concatWith(CompletableSource other);
128
129
/**
130
* Merges this Completable with others
131
*/
132
public final Completable mergeWith(CompletableSource other);
133
```
134
135
### Error Handling
136
137
Handle errors in Completable operations.
138
139
```java { .api }
140
/**
141
* Resumes with another Completable if an error occurs
142
*/
143
public final Completable onErrorResumeNext(Function<? super Throwable, ? extends CompletableSource> errorMapper);
144
public final Completable onErrorResumeNext(CompletableSource resumeCompletableSource);
145
146
/**
147
* Converts errors to completion
148
*/
149
public final Completable onErrorComplete();
150
public final Completable onErrorComplete(Predicate<? super Throwable> predicate);
151
152
/**
153
* Retry on error
154
*/
155
public final Completable retry();
156
public final Completable retry(long times);
157
public final Completable retry(BiPredicate<? super Integer, ? super Throwable> predicate);
158
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler);
159
```
160
161
### Threading
162
163
Control execution context for Completables.
164
165
```java { .api }
166
/**
167
* Specifies the Scheduler on which the Completable will operate
168
*/
169
public final Completable subscribeOn(Scheduler scheduler);
170
171
/**
172
* Specifies the Scheduler on which observers will be notified
173
*/
174
public final Completable observeOn(Scheduler scheduler);
175
```
176
177
### Timing Operations
178
179
Add delays and timeouts to Completables.
180
181
```java { .api }
182
/**
183
* Delays the completion signal
184
*/
185
public final Completable delay(long delay, TimeUnit unit);
186
public final Completable delay(long delay, TimeUnit unit, Scheduler scheduler);
187
188
/**
189
* Adds a timeout to the Completable
190
*/
191
public final Completable timeout(long timeout, TimeUnit unit);
192
public final Completable timeout(long timeout, TimeUnit unit, Scheduler scheduler);
193
public final Completable timeout(long timeout, TimeUnit unit, CompletableSource other);
194
```
195
196
### Repetition and Loops
197
198
Repeat Completable operations.
199
200
```java { .api }
201
/**
202
* Repeats the Completable subscription indefinitely
203
*/
204
public final Completable repeat();
205
206
/**
207
* Repeats the Completable subscription a specified number of times
208
*/
209
public final Completable repeat(long times);
210
211
/**
212
* Repeats the Completable based on a condition
213
*/
214
public final Completable repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler);
215
216
/**
217
* Repeats until a condition becomes true
218
*/
219
public final Completable repeatUntil(BooleanSupplier stop);
220
```
221
222
### Conversion Operations
223
224
Convert Completable to other reactive types.
225
226
```java { .api }
227
/**
228
* Converts Completable to Observable that emits no items but completes
229
*/
230
public final <T> Observable<T> toObservable();
231
232
/**
233
* Converts Completable to Flowable that emits no items but completes
234
*/
235
public final <T> Flowable<T> toFlowable();
236
237
/**
238
* Converts Completable to Single that emits the provided value on completion
239
*/
240
public final <T> Single<T> toSingle(Callable<? extends T> completionValueSupplier);
241
242
/**
243
* Converts Completable to Maybe that completes empty
244
*/
245
public final <T> Maybe<T> toMaybe();
246
247
/**
248
* Converts Completable to Single that emits the provided value
249
*/
250
public final <T> Single<T> toSingleDefault(T completionValue);
251
```
252
253
### Subscription and Consumption
254
255
Subscribe to a Completable and handle completion.
256
257
```java { .api }
258
/**
259
* Subscribes with separate callbacks
260
*/
261
public final Disposable subscribe();
262
public final Disposable subscribe(Action onComplete);
263
public final Disposable subscribe(Action onComplete, Consumer<? super Throwable> onError);
264
265
/**
266
* Subscribes with a CompletableObserver
267
*/
268
public final void subscribe(CompletableObserver observer);
269
270
/**
271
* Blocking operations - use with caution
272
*/
273
public final void blockingAwait();
274
public final boolean blockingAwait(long timeout, TimeUnit unit);
275
```
276
277
### Utility Operations
278
279
Additional utility operations for Completables.
280
281
```java { .api }
282
/**
283
* Caches the result of the Completable
284
*/
285
public final Completable cache();
286
287
/**
288
* Performs side-effects without affecting the Completable
289
*/
290
public final Completable doOnComplete(Action onComplete);
291
public final Completable doOnError(Consumer<? super Throwable> onError);
292
public final Completable doOnSubscribe(Consumer<? super Disposable> onSubscribe);
293
public final Completable doOnDispose(Action onDispose);
294
public final Completable doFinally(Action onFinally);
295
296
/**
297
* Hides the identity of the Completable
298
*/
299
public final Completable hide();
300
301
/**
302
* Lifts a CompletableOperator
303
*/
304
public final Completable lift(CompletableOperator lift);
305
306
/**
307
* Materializes onComplete and onError events as notifications
308
*/
309
public final Single<Notification<Void>> materialize();
310
```
311
312
## Usage Examples
313
314
**Basic Completable Operations:**
315
316
```java
317
import io.reactivex.Completable;
318
import io.reactivex.CompletableObserver;
319
import io.reactivex.disposables.Disposable;
320
321
// Simple completion operation
322
Completable saveData = Completable.fromAction(() -> {
323
// Simulate saving data
324
System.out.println("Saving data...");
325
Thread.sleep(1000);
326
System.out.println("Data saved!");
327
});
328
329
saveData.subscribe(new CompletableObserver() {
330
@Override
331
public void onSubscribe(Disposable d) {
332
System.out.println("Started save operation");
333
}
334
335
@Override
336
public void onComplete() {
337
System.out.println("Save completed successfully");
338
}
339
340
@Override
341
public void onError(Throwable e) {
342
System.err.println("Save failed: " + e.getMessage());
343
}
344
});
345
346
// Lambda-style subscription
347
saveData.subscribe(
348
() -> System.out.println("Success!"),
349
error -> error.printStackTrace()
350
);
351
```
352
353
**Sequential Operations with andThen:**
354
355
```java
356
Completable setup = Completable.fromAction(() -> System.out.println("Setting up..."));
357
Completable process = Completable.fromAction(() -> System.out.println("Processing..."));
358
Completable cleanup = Completable.fromAction(() -> System.out.println("Cleaning up..."));
359
360
// Chain operations sequentially
361
setup.andThen(process)
362
.andThen(cleanup)
363
.subscribe(
364
() -> System.out.println("All operations completed"),
365
error -> System.err.println("Operation failed: " + error)
366
);
367
368
// Chain with other reactive types
369
Single<String> result = setup
370
.andThen(Single.just("Operation result"));
371
372
result.subscribe(value -> System.out.println("Result: " + value));
373
```
374
375
**Parallel Operations with merge:**
376
377
```java
378
Completable task1 = Completable.fromAction(() -> {
379
System.out.println("Task 1 starting");
380
Thread.sleep(1000);
381
System.out.println("Task 1 completed");
382
}).subscribeOn(Schedulers.io());
383
384
Completable task2 = Completable.fromAction(() -> {
385
System.out.println("Task 2 starting");
386
Thread.sleep(1500);
387
System.out.println("Task 2 completed");
388
}).subscribeOn(Schedulers.io());
389
390
Completable task3 = Completable.fromAction(() -> {
391
System.out.println("Task 3 starting");
392
Thread.sleep(800);
393
System.out.println("Task 3 completed");
394
}).subscribeOn(Schedulers.io());
395
396
// All tasks run in parallel, complete when all finish
397
Completable.merge(Arrays.asList(task1, task2, task3))
398
.subscribe(
399
() -> System.out.println("All tasks completed"),
400
error -> System.err.println("One or more tasks failed")
401
);
402
```
403
404
**Error Handling and Retry:**
405
406
```java
407
Completable unreliableOperation = Completable.fromAction(() -> {
408
if (Math.random() > 0.7) {
409
throw new RuntimeException("Random failure");
410
}
411
System.out.println("Operation succeeded");
412
});
413
414
// Retry with backoff
415
unreliableOperation
416
.retry(3)
417
.subscribe(
418
() -> System.out.println("Operation completed successfully"),
419
error -> System.err.println("Operation failed after retries: " + error)
420
);
421
422
// Convert error to completion
423
unreliableOperation
424
.onErrorComplete()
425
.subscribe(() -> System.out.println("Completed (success or error ignored)"));
426
```
427
428
**Delayed and Timer Operations:**
429
430
```java
431
// Delay before completion
432
Completable delayedTask = Completable.fromAction(() -> System.out.println("Task executed"))
433
.delay(2, TimeUnit.SECONDS);
434
435
delayedTask.subscribe(() -> System.out.println("Delayed task completed"));
436
437
// Timer that just completes after delay
438
Completable.timer(1, TimeUnit.SECONDS)
439
.subscribe(() -> System.out.println("Timer completed"));
440
441
// Timeout handling
442
Completable longRunningTask = Completable.fromAction(() -> {
443
Thread.sleep(5000); // 5 seconds
444
System.out.println("Long task completed");
445
});
446
447
longRunningTask
448
.timeout(2, TimeUnit.SECONDS)
449
.subscribe(
450
() -> System.out.println("Task completed in time"),
451
error -> System.err.println("Task timed out: " + error)
452
);
453
```
454
455
**Repetition and Loops:**
456
457
```java
458
// Repeat operation 3 times
459
Completable heartbeat = Completable.fromAction(() -> {
460
System.out.println("Heartbeat: " + System.currentTimeMillis());
461
});
462
463
heartbeat.repeat(3)
464
.subscribe(
465
() -> System.out.println("All heartbeats sent"),
466
error -> error.printStackTrace()
467
);
468
469
// Repeat with condition
470
AtomicInteger counter = new AtomicInteger(0);
471
Completable increment = Completable.fromAction(() -> {
472
int current = counter.incrementAndGet();
473
System.out.println("Counter: " + current);
474
});
475
476
increment.repeatUntil(() -> counter.get() >= 5)
477
.subscribe(() -> System.out.println("Counter reached limit"));
478
```
479
480
**Converting to Other Types:**
481
482
```java
483
Completable operation = Completable.fromAction(() -> System.out.println("Operation done"));
484
485
// Convert to Single with result value
486
Single<String> result = operation.toSingleDefault("Success");
487
result.subscribe(value -> System.out.println("Result: " + value));
488
489
// Convert to Observable that emits no items
490
Observable<Void> observable = operation.toObservable();
491
observable.subscribe(
492
item -> System.out.println("Item: " + item), // Never called
493
error -> error.printStackTrace(),
494
() -> System.out.println("Observable completed")
495
);
496
497
// Chain with Single
498
Single<Integer> calculation = operation.andThen(Single.just(42));
499
calculation.subscribe(value -> System.out.println("Calculated: " + value));
500
```
501
502
## Types
503
504
```java { .api }
505
/**
506
* Observer interface for Completables
507
*/
508
public interface CompletableObserver {
509
void onSubscribe(Disposable d);
510
void onComplete();
511
void onError(Throwable e);
512
}
513
514
/**
515
* Functional interface for creating Completables
516
*/
517
public interface CompletableOnSubscribe {
518
void subscribe(CompletableEmitter emitter) throws Exception;
519
}
520
521
/**
522
* Emitter for CompletableOnSubscribe
523
*/
524
public interface CompletableEmitter {
525
void onComplete();
526
void onError(Throwable t);
527
void setDisposable(Disposable d);
528
void setCancellable(Cancellable c);
529
boolean isDisposed();
530
}
531
532
/**
533
* Base interface for Completable sources
534
*/
535
public interface CompletableSource {
536
void subscribe(CompletableObserver observer);
537
}
538
539
/**
540
* Transformer interface for Completables
541
*/
542
public interface CompletableTransformer {
543
CompletableSource apply(Completable upstream);
544
}
545
546
/**
547
* Converter interface for Completables
548
*/
549
public interface CompletableConverter<R> {
550
R apply(Completable upstream);
551
}
552
```