0
# Resource Management
1
2
Disposable pattern for managing subscriptions and preventing memory leaks. Proper resource management is crucial in reactive programming to avoid memory leaks and ensure clean shutdown.
3
4
## Capabilities
5
6
### Core Disposable Interface
7
8
Base interface for all disposable resources.
9
10
```java { .api }
11
/**
12
* Represents a disposable resource that can be cancelled/disposed
13
*/
14
public interface Disposable {
15
/**
16
* Disposes the resource and cancels any ongoing work
17
*/
18
void dispose();
19
20
/**
21
* Returns true if this resource has been disposed
22
*/
23
boolean isDisposed();
24
}
25
```
26
27
### Disposables Utility Class
28
29
Factory methods and utilities for working with disposables.
30
31
```java { .api }
32
/**
33
* Utility class for creating and managing disposables
34
*/
35
public final class Disposables {
36
/**
37
* Returns a disposed disposable instance
38
*/
39
public static Disposable disposed();
40
41
/**
42
* Returns an empty disposable that does nothing when disposed
43
*/
44
public static Disposable empty();
45
46
/**
47
* Creates a disposable from an Action
48
*/
49
public static Disposable fromAction(Action action);
50
51
/**
52
* Creates a disposable from a Runnable
53
*/
54
public static Disposable fromRunnable(Runnable runnable);
55
56
/**
57
* Creates a disposable from a Future
58
*/
59
public static Disposable fromFuture(Future<?> future);
60
61
/**
62
* Creates a disposable from a Subscription (Reactive Streams)
63
*/
64
public static Disposable fromSubscription(Subscription subscription);
65
66
/**
67
* Creates a disposable from an AutoCloseable resource
68
*/
69
public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);
70
}
71
```
72
73
### CompositeDisposable
74
75
Container for multiple disposables that can be disposed together.
76
77
```java { .api }
78
/**
79
* Container that can hold multiple disposables and dispose them together
80
*/
81
public final class CompositeDisposable implements Disposable {
82
/**
83
* Creates an empty CompositeDisposable
84
*/
85
public CompositeDisposable();
86
87
/**
88
* Creates a CompositeDisposable with initial disposables
89
*/
90
public CompositeDisposable(Disposable... disposables);
91
92
/**
93
* Adds a disposable to this container
94
* Returns true if added, false if this container is already disposed
95
*/
96
public boolean add(Disposable disposable);
97
98
/**
99
* Adds multiple disposables to this container
100
* Returns true if all were added successfully
101
*/
102
public boolean addAll(Disposable... disposables);
103
104
/**
105
* Removes a disposable from this container
106
* Returns true if removed, false if not found
107
*/
108
public boolean remove(Disposable disposable);
109
110
/**
111
* Removes and disposes a disposable from this container
112
* Returns true if found and disposed
113
*/
114
public boolean delete(Disposable disposable);
115
116
/**
117
* Disposes all contained disposables and clears the container
118
*/
119
public void clear();
120
121
/**
122
* Returns the number of currently held disposables
123
*/
124
public int size();
125
126
/**
127
* Disposes all contained disposables
128
* Future additions will be immediately disposed
129
*/
130
public void dispose();
131
132
/**
133
* Returns true if this container has been disposed
134
*/
135
public boolean isDisposed();
136
}
137
```
138
139
### SerialDisposable
140
141
Holds a single disposable that can be swapped atomically.
142
143
```java { .api }
144
/**
145
* Container that holds a single disposable and allows atomic replacement
146
*/
147
public final class SerialDisposable implements Disposable {
148
/**
149
* Creates a new SerialDisposable
150
*/
151
public SerialDisposable();
152
153
/**
154
* Creates a SerialDisposable with an initial disposable
155
*/
156
public SerialDisposable(Disposable initialDisposable);
157
158
/**
159
* Atomically sets the disposable, disposing the previous one if present
160
* Returns true if set successfully, false if this container is disposed
161
*/
162
public boolean set(Disposable disposable);
163
164
/**
165
* Atomically replaces the disposable without disposing the previous one
166
* Returns the previous disposable
167
*/
168
public Disposable replace(Disposable disposable);
169
170
/**
171
* Returns the current disposable (may be null)
172
*/
173
public Disposable get();
174
175
/**
176
* Disposes the current disposable
177
*/
178
public void dispose();
179
180
/**
181
* Returns true if this container has been disposed
182
*/
183
public boolean isDisposed();
184
}
185
```
186
187
### Resource Observers
188
189
Observer implementations with built-in resource management.
190
191
```java { .api }
192
/**
193
* Observer with built-in resource management for Observable
194
*/
195
public abstract class ResourceObserver<T> implements Observer<T>, Disposable {
196
/**
197
* Adds a resource to be disposed when this observer is disposed
198
*/
199
public final void add(Disposable resource);
200
201
/**
202
* Disposes all managed resources
203
*/
204
public final void dispose();
205
206
/**
207
* Returns true if disposed
208
*/
209
public final boolean isDisposed();
210
211
// Abstract methods to implement
212
public abstract void onNext(T t);
213
public abstract void onError(Throwable e);
214
public abstract void onComplete();
215
}
216
217
/**
218
* Subscriber with built-in resource management for Flowable
219
*/
220
public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable {
221
/**
222
* Adds a resource to be disposed when this subscriber is disposed
223
*/
224
public final void add(Disposable resource);
225
226
/**
227
* Requests the specified number of items from upstream
228
*/
229
protected final void request(long n);
230
231
/**
232
* Disposes all managed resources and cancels upstream
233
*/
234
public final void dispose();
235
236
/**
237
* Returns true if disposed
238
*/
239
public final boolean isDisposed();
240
241
// Abstract methods to implement
242
public abstract void onNext(T t);
243
public abstract void onError(Throwable e);
244
public abstract void onComplete();
245
}
246
247
/**
248
* Observer with built-in resource management for Single
249
*/
250
public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable {
251
public final void add(Disposable resource);
252
public final void dispose();
253
public final boolean isDisposed();
254
255
public abstract void onSuccess(T t);
256
public abstract void onError(Throwable e);
257
}
258
259
/**
260
* Observer with built-in resource management for Maybe
261
*/
262
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {
263
public final void add(Disposable resource);
264
public final void dispose();
265
public final boolean isDisposed();
266
267
public abstract void onSuccess(T t);
268
public abstract void onError(Throwable e);
269
public abstract void onComplete();
270
}
271
272
/**
273
* Observer with built-in resource management for Completable
274
*/
275
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
276
public final void add(Disposable resource);
277
public final void dispose();
278
public final boolean isDisposed();
279
280
public abstract void onComplete();
281
public abstract void onError(Throwable e);
282
}
283
```
284
285
## Usage Examples
286
287
**Basic Disposable Management:**
288
289
```java
290
import io.reactivex.Observable;
291
import io.reactivex.disposables.Disposable;
292
import io.reactivex.schedulers.Schedulers;
293
294
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
295
.subscribeOn(Schedulers.io());
296
297
// Subscribe and keep the disposable
298
Disposable disposable = source.subscribe(
299
value -> System.out.println("Value: " + value),
300
error -> error.printStackTrace()
301
);
302
303
// Later, dispose to stop the stream and free resources
304
Thread.sleep(5000);
305
disposable.dispose();
306
307
// Check if disposed
308
System.out.println("Is disposed: " + disposable.isDisposed());
309
```
310
311
**CompositeDisposable for Multiple Subscriptions:**
312
313
```java
314
import io.reactivex.disposables.CompositeDisposable;
315
316
CompositeDisposable compositeDisposable = new CompositeDisposable();
317
318
// Add multiple subscriptions
319
Observable<Long> timer1 = Observable.interval(1, TimeUnit.SECONDS);
320
Observable<Long> timer2 = Observable.interval(2, TimeUnit.SECONDS);
321
Observable<String> http = Observable.fromCallable(() -> fetchFromNetwork())
322
.subscribeOn(Schedulers.io());
323
324
compositeDisposable.add(timer1.subscribe(v -> System.out.println("Timer 1: " + v)));
325
compositeDisposable.add(timer2.subscribe(v -> System.out.println("Timer 2: " + v)));
326
compositeDisposable.add(http.subscribe(result -> System.out.println("HTTP: " + result)));
327
328
System.out.println("Active subscriptions: " + compositeDisposable.size());
329
330
// Dispose all at once
331
Thread.sleep(10000);
332
compositeDisposable.dispose();
333
334
// All subscriptions are now disposed
335
System.out.println("All disposed: " + compositeDisposable.isDisposed());
336
```
337
338
**SerialDisposable for Sequential Operations:**
339
340
```java
341
import io.reactivex.disposables.SerialDisposable;
342
343
SerialDisposable serialDisposable = new SerialDisposable();
344
345
// Start first operation
346
Observable<String> operation1 = Observable.just("Operation 1")
347
.delay(2, TimeUnit.SECONDS);
348
serialDisposable.set(operation1.subscribe(System.out::println));
349
350
// Replace with second operation (first one gets disposed)
351
Thread.sleep(1000);
352
Observable<String> operation2 = Observable.just("Operation 2")
353
.delay(1, TimeUnit.SECONDS);
354
serialDisposable.set(operation2.subscribe(System.out::println));
355
356
// Only operation 2 will complete
357
Thread.sleep(3000);
358
serialDisposable.dispose();
359
```
360
361
**ResourceObserver for Complex Resource Management:**
362
363
```java
364
import io.reactivex.observers.ResourceObserver;
365
366
class CustomResourceObserver extends ResourceObserver<String> {
367
private FileWriter fileWriter;
368
369
@Override
370
protected void onStart() {
371
try {
372
fileWriter = new FileWriter("output.txt");
373
// Add file writer as a resource to be closed on disposal
374
add(Disposables.fromAutoCloseable(fileWriter));
375
} catch (IOException e) {
376
dispose(); // Dispose if setup fails
377
}
378
}
379
380
@Override
381
public void onNext(String value) {
382
try {
383
fileWriter.write(value + "\n");
384
fileWriter.flush();
385
} catch (IOException e) {
386
dispose();
387
}
388
}
389
390
@Override
391
public void onError(Throwable e) {
392
System.err.println("Error: " + e.getMessage());
393
// Resources will be automatically disposed
394
}
395
396
@Override
397
public void onComplete() {
398
System.out.println("Writing completed");
399
// Resources will be automatically disposed
400
}
401
}
402
403
// Usage
404
Observable<String> data = Observable.just("Line 1", "Line 2", "Line 3");
405
CustomResourceObserver observer = new CustomResourceObserver();
406
data.subscribe(observer);
407
408
// Can dispose manually if needed
409
// observer.dispose();
410
```
411
412
**Custom Disposable Creation:**
413
414
```java
415
// Create disposable from Action
416
Disposable actionDisposable = Disposables.fromAction(() -> {
417
System.out.println("Cleaning up resources");
418
// Cleanup code here
419
});
420
421
// Create disposable from Runnable
422
Disposable runnableDisposable = Disposables.fromRunnable(() -> {
423
System.out.println("Shutdown procedure");
424
});
425
426
// Create disposable from Future
427
ExecutorService executor = Executors.newSingleThreadExecutor();
428
Future<?> future = executor.submit(() -> {
429
// Long running task
430
Thread.sleep(10000);
431
return "Done";
432
});
433
Disposable futureDisposable = Disposables.fromFuture(future);
434
435
// Dispose all
436
actionDisposable.dispose();
437
runnableDisposable.dispose();
438
futureDisposable.dispose(); // This will cancel the future
439
executor.shutdown();
440
```
441
442
**Lifecycle Management in Android/UI Applications:**
443
444
```java
445
public class MainActivity {
446
private final CompositeDisposable compositeDisposable = new CompositeDisposable();
447
448
public void onCreate() {
449
// Start various subscriptions
450
compositeDisposable.add(
451
userService.getCurrentUser()
452
.subscribeOn(Schedulers.io())
453
.observeOn(AndroidSchedulers.mainThread())
454
.subscribe(this::updateUI)
455
);
456
457
compositeDisposable.add(
458
locationService.getLocationUpdates()
459
.subscribe(this::updateLocation)
460
);
461
462
compositeDisposable.add(
463
messageService.getMessages()
464
.subscribe(this::showMessage)
465
);
466
}
467
468
public void onDestroy() {
469
// Clean up all subscriptions to prevent memory leaks
470
compositeDisposable.dispose();
471
}
472
473
private void updateUI(User user) { /* Update UI */ }
474
private void updateLocation(Location location) { /* Update location */ }
475
private void showMessage(String message) { /* Show message */ }
476
}
477
```
478
479
**Error Handling with Resource Management:**
480
481
```java
482
CompositeDisposable resources = new CompositeDisposable();
483
484
try {
485
// Add resources that might fail
486
resources.add(Observable.interval(1, TimeUnit.SECONDS)
487
.subscribe(
488
value -> {
489
if (value > 5) {
490
throw new RuntimeException("Simulated error");
491
}
492
System.out.println("Value: " + value);
493
},
494
error -> {
495
System.err.println("Stream error: " + error.getMessage());
496
// Don't forget to clean up other resources on error
497
resources.dispose();
498
}
499
));
500
501
resources.add(Observable.timer(10, TimeUnit.SECONDS)
502
.subscribe(ignored -> System.out.println("Timer completed")));
503
504
} catch (Exception e) {
505
// Ensure cleanup on any exception
506
resources.dispose();
507
throw e;
508
}
509
```
510
511
**Memory Leak Prevention:**
512
513
```java
514
public class DataProcessor {
515
private final CompositeDisposable subscriptions = new CompositeDisposable();
516
private final PublishSubject<String> subject = PublishSubject.create();
517
518
public void startProcessing() {
519
// Process data with proper cleanup
520
subscriptions.add(
521
subject
522
.buffer(5, TimeUnit.SECONDS)
523
.filter(buffer -> !buffer.isEmpty())
524
.flatMap(this::processBuffer)
525
.subscribe(
526
result -> System.out.println("Processed: " + result),
527
error -> System.err.println("Processing error: " + error)
528
)
529
);
530
}
531
532
public void addData(String data) {
533
if (!subscriptions.isDisposed()) {
534
subject.onNext(data);
535
}
536
}
537
538
public void shutdown() {
539
// Proper cleanup prevents memory leaks
540
subscriptions.dispose();
541
subject.onComplete();
542
}
543
544
private Observable<String> processBuffer(List<String> buffer) {
545
return Observable.fromCallable(() -> {
546
// Process buffer
547
return "Processed " + buffer.size() + " items";
548
}).subscribeOn(Schedulers.computation());
549
}
550
}
551
```
552
553
## Best Practices
554
555
**Resource Management Guidelines:**
556
557
1. **Always dispose**: Keep references to disposables and dispose them when done
558
2. **Use CompositeDisposable**: For managing multiple subscriptions together
559
3. **Lifecycle awareness**: Dispose in appropriate lifecycle methods (onDestroy, onPause, etc.)
560
4. **Error handling**: Ensure resources are disposed even when errors occur
561
5. **Resource observers**: Use ResourceObserver/ResourceSubscriber for complex resource management
562
6. **SerialDisposable**: Use for sequential operations where you need to cancel previous work
563
7. **Memory leaks**: Always dispose long-running or infinite streams
564
8. **Thread safety**: Disposables are thread-safe and can be disposed from any thread
565
566
**Common Patterns:**
567
568
- Activity/Fragment lifecycle management with CompositeDisposable
569
- Network request cancellation with individual disposables
570
- File/database resource management with ResourceObserver
571
- Background task management with SerialDisposable
572
- Event bus subscriptions with proper cleanup
573
574
## Types
575
576
```java { .api }
577
/**
578
* Functional interface for cleanup actions
579
*/
580
public interface Action {
581
void run() throws Exception;
582
}
583
584
/**
585
* Interface for objects that can be cancelled
586
*/
587
public interface Cancellable {
588
void cancel() throws Exception;
589
}
590
591
/**
592
* Exception thrown by dispose operations
593
*/
594
public class CompositeException extends RuntimeException {
595
public List<Throwable> getExceptions();
596
}
597
```