0
# Resource Management
1
2
RxJava's Disposable pattern provides resource management for reactive streams, enabling proper cleanup of subscriptions, threads, and other resources to prevent memory leaks and resource exhaustion.
3
4
## Capabilities
5
6
### Basic Disposable Operations
7
8
Core interface for managing reactive stream resources.
9
10
```java { .api }
11
/**
12
* Represents a disposable resource
13
*/
14
public interface Disposable {
15
/**
16
* Dispose of the resource and cancel any ongoing operations
17
*/
18
void dispose();
19
20
/**
21
* Check if this resource has been disposed
22
* @return true if disposed, false otherwise
23
*/
24
boolean isDisposed();
25
}
26
27
/**
28
* Empty disposable that does nothing when disposed
29
* @return Disposable that performs no action
30
*/
31
public static Disposable empty();
32
33
/**
34
* Create a disposable from a Runnable action
35
* @param action the action to run when disposed
36
* @return Disposable that runs the action when disposed
37
*/
38
public static Disposable fromRunnable(Runnable action);
39
40
/**
41
* Create a disposable from an Action
42
* @param action the action to run when disposed
43
* @return Disposable that runs the action when disposed
44
*/
45
public static Disposable fromAction(Action action);
46
47
/**
48
* Create a disposable from an AutoCloseable resource
49
* @param autoCloseable the resource to close when disposed
50
* @return Disposable that closes the resource when disposed
51
*/
52
public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);
53
54
/**
55
* Create a disposable from a Future
56
* @param future the Future to cancel when disposed
57
* @param mayInterruptIfRunning whether to interrupt if running
58
* @return Disposable that cancels the Future when disposed
59
*/
60
public static Disposable fromFuture(Future<?> future, boolean mayInterruptIfRunning);
61
```
62
63
### Composite Disposable
64
65
Manage multiple disposables as a single unit.
66
67
```java { .api }
68
/**
69
* Container for multiple disposables that can be disposed together
70
*/
71
public final class CompositeDisposable implements Disposable, DisposableContainer {
72
73
/**
74
* Create an empty CompositeDisposable
75
*/
76
public CompositeDisposable();
77
78
/**
79
* Create a CompositeDisposable with initial disposables
80
* @param disposables the initial disposables to add
81
*/
82
public CompositeDisposable(Disposable... disposables);
83
84
/**
85
* Add a disposable to this composite
86
* @param disposable the disposable to add
87
* @return true if added successfully, false if already disposed
88
*/
89
public boolean add(Disposable disposable);
90
91
/**
92
* Add multiple disposables to this composite
93
* @param disposables the disposables to add
94
* @return true if all were added successfully
95
*/
96
public boolean addAll(Disposable... disposables);
97
98
/**
99
* Remove and dispose a specific disposable
100
* @param disposable the disposable to remove
101
* @return true if found and removed, false otherwise
102
*/
103
public boolean remove(Disposable disposable);
104
105
/**
106
* Remove a specific disposable without disposing it
107
* @param disposable the disposable to remove
108
* @return true if found and removed, false otherwise
109
*/
110
public boolean delete(Disposable disposable);
111
112
/**
113
* Clear all disposables and dispose them
114
*/
115
public void clear();
116
117
/**
118
* Get the number of disposables in this composite
119
* @return the count of disposables
120
*/
121
public int size();
122
123
/**
124
* Dispose all contained disposables and prevent new additions
125
*/
126
public void dispose();
127
128
/**
129
* Check if this composite has been disposed
130
* @return true if disposed, false otherwise
131
*/
132
public boolean isDisposed();
133
}
134
```
135
136
### Serial Disposable
137
138
Replace disposables while maintaining single active subscription.
139
140
```java { .api }
141
/**
142
* Container that holds at most one disposable at a time
143
*/
144
public final class SerialDisposable implements Disposable {
145
146
/**
147
* Create an empty SerialDisposable
148
*/
149
public SerialDisposable();
150
151
/**
152
* Create a SerialDisposable with an initial disposable
153
* @param initialDisposable the initial disposable
154
*/
155
public SerialDisposable(Disposable initialDisposable);
156
157
/**
158
* Replace the current disposable with a new one
159
* @param next the new disposable (disposes the current one)
160
* @return true if set successfully, false if already disposed
161
*/
162
public boolean replace(Disposable next);
163
164
/**
165
* Update the current disposable
166
* @param next the new disposable (does not dispose the current one)
167
* @return true if set successfully, false if already disposed
168
*/
169
public boolean set(Disposable next);
170
171
/**
172
* Get the current disposable
173
* @return the current disposable or null if none set
174
*/
175
public Disposable get();
176
177
/**
178
* Dispose the current disposable and prevent new ones
179
*/
180
public void dispose();
181
182
/**
183
* Check if this serial disposable has been disposed
184
* @return true if disposed, false otherwise
185
*/
186
public boolean isDisposed();
187
}
188
```
189
190
### Disposable Container
191
192
Interface for managing collections of disposables.
193
194
```java { .api }
195
/**
196
* Interface for containers that can hold disposables
197
*/
198
public interface DisposableContainer {
199
/**
200
* Add a disposable to this container
201
* @param disposable the disposable to add
202
* @return true if added successfully, false if container is disposed
203
*/
204
boolean add(Disposable disposable);
205
206
/**
207
* Remove a disposable from this container
208
* @param disposable the disposable to remove
209
* @return true if found and removed, false otherwise
210
*/
211
boolean remove(Disposable disposable);
212
213
/**
214
* Remove a disposable without disposing it
215
* @param disposable the disposable to remove
216
* @return true if found and removed, false otherwise
217
*/
218
boolean delete(Disposable disposable);
219
}
220
```
221
222
### Utility Disposables
223
224
Specialized disposable implementations for common scenarios.
225
226
```java { .api }
227
/**
228
* Disposable that wraps a Runnable action
229
*/
230
public final class RunnableDisposable extends ReferenceDisposable<Runnable> {
231
/**
232
* Create a RunnableDisposable
233
* @param run the Runnable to execute on disposal
234
*/
235
public RunnableDisposable(Runnable run);
236
}
237
238
/**
239
* Disposable that wraps an Action
240
*/
241
public final class ActionDisposable extends ReferenceDisposable<Action> {
242
/**
243
* Create an ActionDisposable
244
* @param action the Action to execute on disposal
245
*/
246
public ActionDisposable(Action action);
247
}
248
249
/**
250
* Disposable that cancels a Future
251
*/
252
public final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {
253
/**
254
* Create a FutureDisposable
255
* @param future the Future to cancel on disposal
256
* @param allowInterrupt whether to allow interruption
257
*/
258
public FutureDisposable(Future<?> future, boolean allowInterrupt);
259
}
260
261
/**
262
* Disposable that closes an AutoCloseable resource
263
*/
264
public final class AutoCloseableDisposable extends ReferenceDisposable<AutoCloseable> {
265
/**
266
* Create an AutoCloseableDisposable
267
* @param autoCloseable the resource to close on disposal
268
*/
269
public AutoCloseableDisposable(AutoCloseable autoCloseable);
270
}
271
```
272
273
## Types
274
275
```java { .api }
276
/**
277
* Base class for reference-based disposables
278
*/
279
public abstract class ReferenceDisposable<T> extends AtomicReference<T> implements Disposable {
280
/**
281
* Create a ReferenceDisposable with a value
282
* @param value the value to hold
283
*/
284
protected ReferenceDisposable(T value);
285
286
/**
287
* Called when the disposable is disposed
288
* @param value the value being disposed
289
*/
290
protected abstract void onDisposed(T value);
291
292
/**
293
* Dispose of the held reference
294
*/
295
public final void dispose();
296
297
/**
298
* Check if disposed
299
* @return true if disposed, false otherwise
300
*/
301
public final boolean isDisposed();
302
}
303
```
304
305
**Usage Examples:**
306
307
```java
308
import io.reactivex.rxjava3.core.*;
309
import io.reactivex.rxjava3.disposables.*;
310
import io.reactivex.rxjava3.schedulers.Schedulers;
311
import java.util.concurrent.TimeUnit;
312
313
// Basic subscription disposal
314
Disposable subscription = Observable.interval(1, TimeUnit.SECONDS)
315
.subscribe(System.out::println);
316
317
// Dispose after 5 seconds
318
Thread.sleep(5000);
319
subscription.dispose();
320
321
// CompositeDisposable for multiple subscriptions
322
CompositeDisposable compositeDisposable = new CompositeDisposable();
323
324
Disposable sub1 = Observable.interval(1, TimeUnit.SECONDS)
325
.subscribe(tick -> System.out.println("Sub1: " + tick));
326
327
Disposable sub2 = Observable.interval(2, TimeUnit.SECONDS)
328
.subscribe(tick -> System.out.println("Sub2: " + tick));
329
330
// Add both subscriptions to composite
331
compositeDisposable.addAll(sub1, sub2);
332
333
// Dispose all at once
334
Thread.sleep(10000);
335
compositeDisposable.dispose();
336
337
// SerialDisposable for replacing subscriptions
338
SerialDisposable serialDisposable = new SerialDisposable();
339
340
// Start with one subscription
341
serialDisposable.set(Observable.interval(1, TimeUnit.SECONDS)
342
.subscribe(tick -> System.out.println("First: " + tick)));
343
344
// Replace with different subscription after 5 seconds
345
Thread.sleep(5000);
346
serialDisposable.replace(Observable.interval(500, TimeUnit.MILLISECONDS)
347
.subscribe(tick -> System.out.println("Second: " + tick)));
348
349
// Clean up
350
Thread.sleep(5000);
351
serialDisposable.dispose();
352
353
// Custom disposable from action
354
Disposable customDisposable = Disposable.fromAction(() -> {
355
System.out.println("Custom cleanup executed");
356
});
357
358
customDisposable.dispose();
359
360
// Resource management pattern
361
CompositeDisposable resources = new CompositeDisposable();
362
363
try {
364
// Add various resources
365
resources.add(Observable.interval(1, TimeUnit.SECONDS)
366
.subscribe(System.out::println));
367
368
resources.add(Single.timer(5, TimeUnit.SECONDS)
369
.subscribe(tick -> System.out.println("Timer completed")));
370
371
resources.add(Disposable.fromRunnable(() ->
372
System.out.println("Cleanup task executed")));
373
374
// Simulate work
375
Thread.sleep(3000);
376
377
} finally {
378
// Ensure all resources are cleaned up
379
resources.dispose();
380
}
381
382
// Conditional disposal
383
CompositeDisposable conditionalResources = new CompositeDisposable();
384
385
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS);
386
387
Disposable subscription1 = source.subscribe(tick -> {
388
System.out.println("Tick: " + tick);
389
if (tick >= 5) {
390
conditionalResources.dispose(); // Stop all when condition met
391
}
392
});
393
394
conditionalResources.add(subscription1);
395
396
// Preventing memory leaks in Android-style lifecycle
397
class ReactiveComponent {
398
private final CompositeDisposable disposables = new CompositeDisposable();
399
400
public void onCreate() {
401
// Add subscriptions
402
disposables.add(
403
Observable.interval(1, TimeUnit.SECONDS)
404
.observeOn(Schedulers.single())
405
.subscribe(this::updateUI)
406
);
407
}
408
409
public void onDestroy() {
410
// Clean up all subscriptions
411
disposables.clear();
412
}
413
414
private void updateUI(Long tick) {
415
System.out.println("UI update: " + tick);
416
}
417
}
418
419
// Using DisposableObserver for manual resource management
420
DisposableObserver<Long> observer = new DisposableObserver<Long>() {
421
@Override
422
public void onNext(Long value) {
423
System.out.println("Received: " + value);
424
if (value >= 3) {
425
dispose(); // Self-dispose when condition met
426
}
427
}
428
429
@Override
430
public void onError(Throwable e) {
431
System.err.println("Error: " + e);
432
}
433
434
@Override
435
public void onComplete() {
436
System.out.println("Completed");
437
}
438
};
439
440
Observable.range(1, 10)
441
.subscribe(observer);
442
```
443
444
## Best Practices
445
446
### Resource Management Guidelines
447
448
1. **Always dispose of subscriptions** to prevent memory leaks
449
2. **Use CompositeDisposable** for managing multiple subscriptions
450
3. **Clear CompositeDisposable** in cleanup methods (onDestroy, etc.)
451
4. **Use SerialDisposable** when you need to replace subscriptions
452
5. **Dispose in finally blocks** to ensure cleanup even on exceptions
453
6. **Check isDisposed()** before performing operations that might fail if disposed
454
7. **Use DisposableObserver/DisposableSubscriber** for fine-grained control
455
8. **Create custom disposables** for domain-specific resource cleanup