or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mderror-handling.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

disposables.mddocs/

0

# Resource Management

1

2

Disposable pattern for managing subscriptions and preventing memory leaks. Proper resource management is crucial in reactive programming to avoid memory leaks and ensure clean shutdown.

3

4

## Capabilities

5

6

### Core Disposable Interface

7

8

Base interface for all disposable resources.

9

10

```java { .api }

11

/**

12

* Represents a disposable resource that can be cancelled/disposed

13

*/

14

public interface Disposable {

15

/**

16

* Disposes the resource and cancels any ongoing work

17

*/

18

void dispose();

19

20

/**

21

* Returns true if this resource has been disposed

22

*/

23

boolean isDisposed();

24

}

25

```

26

27

### Disposables Utility Class

28

29

Factory methods and utilities for working with disposables.

30

31

```java { .api }

32

/**

33

* Utility class for creating and managing disposables

34

*/

35

public final class Disposables {

36

/**

37

* Returns a disposed disposable instance

38

*/

39

public static Disposable disposed();

40

41

/**

42

* Returns an empty disposable that does nothing when disposed

43

*/

44

public static Disposable empty();

45

46

/**

47

* Creates a disposable from an Action

48

*/

49

public static Disposable fromAction(Action action);

50

51

/**

52

* Creates a disposable from a Runnable

53

*/

54

public static Disposable fromRunnable(Runnable runnable);

55

56

/**

57

* Creates a disposable from a Future

58

*/

59

public static Disposable fromFuture(Future<?> future);

60

61

/**

62

* Creates a disposable from a Subscription (Reactive Streams)

63

*/

64

public static Disposable fromSubscription(Subscription subscription);

65

66

/**

67

* Creates a disposable from an AutoCloseable resource

68

*/

69

public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);

70

}

71

```

72

73

### CompositeDisposable

74

75

Container for multiple disposables that can be disposed together.

76

77

```java { .api }

78

/**

79

* Container that can hold multiple disposables and dispose them together

80

*/

81

public final class CompositeDisposable implements Disposable {

82

/**

83

* Creates an empty CompositeDisposable

84

*/

85

public CompositeDisposable();

86

87

/**

88

* Creates a CompositeDisposable with initial disposables

89

*/

90

public CompositeDisposable(Disposable... disposables);

91

92

/**

93

* Adds a disposable to this container

94

* Returns true if added, false if this container is already disposed

95

*/

96

public boolean add(Disposable disposable);

97

98

/**

99

* Adds multiple disposables to this container

100

* Returns true if all were added successfully

101

*/

102

public boolean addAll(Disposable... disposables);

103

104

/**

105

* Removes a disposable from this container

106

* Returns true if removed, false if not found

107

*/

108

public boolean remove(Disposable disposable);

109

110

/**

111

* Removes and disposes a disposable from this container

112

* Returns true if found and disposed

113

*/

114

public boolean delete(Disposable disposable);

115

116

/**

117

* Disposes all contained disposables and clears the container

118

*/

119

public void clear();

120

121

/**

122

* Returns the number of currently held disposables

123

*/

124

public int size();

125

126

/**

127

* Disposes all contained disposables

128

* Future additions will be immediately disposed

129

*/

130

public void dispose();

131

132

/**

133

* Returns true if this container has been disposed

134

*/

135

public boolean isDisposed();

136

}

137

```

138

139

### SerialDisposable

140

141

Holds a single disposable that can be swapped atomically.

142

143

```java { .api }

144

/**

145

* Container that holds a single disposable and allows atomic replacement

146

*/

147

public final class SerialDisposable implements Disposable {

148

/**

149

* Creates a new SerialDisposable

150

*/

151

public SerialDisposable();

152

153

/**

154

* Creates a SerialDisposable with an initial disposable

155

*/

156

public SerialDisposable(Disposable initialDisposable);

157

158

/**

159

* Atomically sets the disposable, disposing the previous one if present

160

* Returns true if set successfully, false if this container is disposed

161

*/

162

public boolean set(Disposable disposable);

163

164

/**

165

* Atomically replaces the disposable without disposing the previous one

166

* Returns the previous disposable

167

*/

168

public Disposable replace(Disposable disposable);

169

170

/**

171

* Returns the current disposable (may be null)

172

*/

173

public Disposable get();

174

175

/**

176

* Disposes the current disposable

177

*/

178

public void dispose();

179

180

/**

181

* Returns true if this container has been disposed

182

*/

183

public boolean isDisposed();

184

}

185

```

186

187

### Resource Observers

188

189

Observer implementations with built-in resource management.

190

191

```java { .api }

192

/**

193

* Observer with built-in resource management for Observable

194

*/

195

public abstract class ResourceObserver<T> implements Observer<T>, Disposable {

196

/**

197

* Adds a resource to be disposed when this observer is disposed

198

*/

199

public final void add(Disposable resource);

200

201

/**

202

* Disposes all managed resources

203

*/

204

public final void dispose();

205

206

/**

207

* Returns true if disposed

208

*/

209

public final boolean isDisposed();

210

211

// Abstract methods to implement

212

public abstract void onNext(T t);

213

public abstract void onError(Throwable e);

214

public abstract void onComplete();

215

}

216

217

/**

218

* Subscriber with built-in resource management for Flowable

219

*/

220

public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable {

221

/**

222

* Adds a resource to be disposed when this subscriber is disposed

223

*/

224

public final void add(Disposable resource);

225

226

/**

227

* Requests the specified number of items from upstream

228

*/

229

protected final void request(long n);

230

231

/**

232

* Disposes all managed resources and cancels upstream

233

*/

234

public final void dispose();

235

236

/**

237

* Returns true if disposed

238

*/

239

public final boolean isDisposed();

240

241

// Abstract methods to implement

242

public abstract void onNext(T t);

243

public abstract void onError(Throwable e);

244

public abstract void onComplete();

245

}

246

247

/**

248

* Observer with built-in resource management for Single

249

*/

250

public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable {

251

public final void add(Disposable resource);

252

public final void dispose();

253

public final boolean isDisposed();

254

255

public abstract void onSuccess(T t);

256

public abstract void onError(Throwable e);

257

}

258

259

/**

260

* Observer with built-in resource management for Maybe

261

*/

262

public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {

263

public final void add(Disposable resource);

264

public final void dispose();

265

public final boolean isDisposed();

266

267

public abstract void onSuccess(T t);

268

public abstract void onError(Throwable e);

269

public abstract void onComplete();

270

}

271

272

/**

273

* Observer with built-in resource management for Completable

274

*/

275

public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {

276

public final void add(Disposable resource);

277

public final void dispose();

278

public final boolean isDisposed();

279

280

public abstract void onComplete();

281

public abstract void onError(Throwable e);

282

}

283

```

284

285

## Usage Examples

286

287

**Basic Disposable Management:**

288

289

```java

290

import io.reactivex.Observable;

291

import io.reactivex.disposables.Disposable;

292

import io.reactivex.schedulers.Schedulers;

293

294

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)

295

.subscribeOn(Schedulers.io());

296

297

// Subscribe and keep the disposable

298

Disposable disposable = source.subscribe(

299

value -> System.out.println("Value: " + value),

300

error -> error.printStackTrace()

301

);

302

303

// Later, dispose to stop the stream and free resources

304

Thread.sleep(5000);

305

disposable.dispose();

306

307

// Check if disposed

308

System.out.println("Is disposed: " + disposable.isDisposed());

309

```

310

311

**CompositeDisposable for Multiple Subscriptions:**

312

313

```java

314

import io.reactivex.disposables.CompositeDisposable;

315

316

CompositeDisposable compositeDisposable = new CompositeDisposable();

317

318

// Add multiple subscriptions

319

Observable<Long> timer1 = Observable.interval(1, TimeUnit.SECONDS);

320

Observable<Long> timer2 = Observable.interval(2, TimeUnit.SECONDS);

321

Observable<String> http = Observable.fromCallable(() -> fetchFromNetwork())

322

.subscribeOn(Schedulers.io());

323

324

compositeDisposable.add(timer1.subscribe(v -> System.out.println("Timer 1: " + v)));

325

compositeDisposable.add(timer2.subscribe(v -> System.out.println("Timer 2: " + v)));

326

compositeDisposable.add(http.subscribe(result -> System.out.println("HTTP: " + result)));

327

328

System.out.println("Active subscriptions: " + compositeDisposable.size());

329

330

// Dispose all at once

331

Thread.sleep(10000);

332

compositeDisposable.dispose();

333

334

// All subscriptions are now disposed

335

System.out.println("All disposed: " + compositeDisposable.isDisposed());

336

```

337

338

**SerialDisposable for Sequential Operations:**

339

340

```java

341

import io.reactivex.disposables.SerialDisposable;

342

343

SerialDisposable serialDisposable = new SerialDisposable();

344

345

// Start first operation

346

Observable<String> operation1 = Observable.just("Operation 1")

347

.delay(2, TimeUnit.SECONDS);

348

serialDisposable.set(operation1.subscribe(System.out::println));

349

350

// Replace with second operation (first one gets disposed)

351

Thread.sleep(1000);

352

Observable<String> operation2 = Observable.just("Operation 2")

353

.delay(1, TimeUnit.SECONDS);

354

serialDisposable.set(operation2.subscribe(System.out::println));

355

356

// Only operation 2 will complete

357

Thread.sleep(3000);

358

serialDisposable.dispose();

359

```

360

361

**ResourceObserver for Complex Resource Management:**

362

363

```java

364

import io.reactivex.observers.ResourceObserver;

365

366

class CustomResourceObserver extends ResourceObserver<String> {

367

private FileWriter fileWriter;

368

369

@Override

370

protected void onStart() {

371

try {

372

fileWriter = new FileWriter("output.txt");

373

// Add file writer as a resource to be closed on disposal

374

add(Disposables.fromAutoCloseable(fileWriter));

375

} catch (IOException e) {

376

dispose(); // Dispose if setup fails

377

}

378

}

379

380

@Override

381

public void onNext(String value) {

382

try {

383

fileWriter.write(value + "\n");

384

fileWriter.flush();

385

} catch (IOException e) {

386

dispose();

387

}

388

}

389

390

@Override

391

public void onError(Throwable e) {

392

System.err.println("Error: " + e.getMessage());

393

// Resources will be automatically disposed

394

}

395

396

@Override

397

public void onComplete() {

398

System.out.println("Writing completed");

399

// Resources will be automatically disposed

400

}

401

}

402

403

// Usage

404

Observable<String> data = Observable.just("Line 1", "Line 2", "Line 3");

405

CustomResourceObserver observer = new CustomResourceObserver();

406

data.subscribe(observer);

407

408

// Can dispose manually if needed

409

// observer.dispose();

410

```

411

412

**Custom Disposable Creation:**

413

414

```java

415

// Create disposable from Action

416

Disposable actionDisposable = Disposables.fromAction(() -> {

417

System.out.println("Cleaning up resources");

418

// Cleanup code here

419

});

420

421

// Create disposable from Runnable

422

Disposable runnableDisposable = Disposables.fromRunnable(() -> {

423

System.out.println("Shutdown procedure");

424

});

425

426

// Create disposable from Future

427

ExecutorService executor = Executors.newSingleThreadExecutor();

428

Future<?> future = executor.submit(() -> {

429

// Long running task

430

Thread.sleep(10000);

431

return "Done";

432

});

433

Disposable futureDisposable = Disposables.fromFuture(future);

434

435

// Dispose all

436

actionDisposable.dispose();

437

runnableDisposable.dispose();

438

futureDisposable.dispose(); // This will cancel the future

439

executor.shutdown();

440

```

441

442

**Lifecycle Management in Android/UI Applications:**

443

444

```java

445

public class MainActivity {

446

private final CompositeDisposable compositeDisposable = new CompositeDisposable();

447

448

public void onCreate() {

449

// Start various subscriptions

450

compositeDisposable.add(

451

userService.getCurrentUser()

452

.subscribeOn(Schedulers.io())

453

.observeOn(AndroidSchedulers.mainThread())

454

.subscribe(this::updateUI)

455

);

456

457

compositeDisposable.add(

458

locationService.getLocationUpdates()

459

.subscribe(this::updateLocation)

460

);

461

462

compositeDisposable.add(

463

messageService.getMessages()

464

.subscribe(this::showMessage)

465

);

466

}

467

468

public void onDestroy() {

469

// Clean up all subscriptions to prevent memory leaks

470

compositeDisposable.dispose();

471

}

472

473

private void updateUI(User user) { /* Update UI */ }

474

private void updateLocation(Location location) { /* Update location */ }

475

private void showMessage(String message) { /* Show message */ }

476

}

477

```

478

479

**Error Handling with Resource Management:**

480

481

```java

482

CompositeDisposable resources = new CompositeDisposable();

483

484

try {

485

// Add resources that might fail

486

resources.add(Observable.interval(1, TimeUnit.SECONDS)

487

.subscribe(

488

value -> {

489

if (value > 5) {

490

throw new RuntimeException("Simulated error");

491

}

492

System.out.println("Value: " + value);

493

},

494

error -> {

495

System.err.println("Stream error: " + error.getMessage());

496

// Don't forget to clean up other resources on error

497

resources.dispose();

498

}

499

));

500

501

resources.add(Observable.timer(10, TimeUnit.SECONDS)

502

.subscribe(ignored -> System.out.println("Timer completed")));

503

504

} catch (Exception e) {

505

// Ensure cleanup on any exception

506

resources.dispose();

507

throw e;

508

}

509

```

510

511

**Memory Leak Prevention:**

512

513

```java

514

public class DataProcessor {

515

private final CompositeDisposable subscriptions = new CompositeDisposable();

516

private final PublishSubject<String> subject = PublishSubject.create();

517

518

public void startProcessing() {

519

// Process data with proper cleanup

520

subscriptions.add(

521

subject

522

.buffer(5, TimeUnit.SECONDS)

523

.filter(buffer -> !buffer.isEmpty())

524

.flatMap(this::processBuffer)

525

.subscribe(

526

result -> System.out.println("Processed: " + result),

527

error -> System.err.println("Processing error: " + error)

528

)

529

);

530

}

531

532

public void addData(String data) {

533

if (!subscriptions.isDisposed()) {

534

subject.onNext(data);

535

}

536

}

537

538

public void shutdown() {

539

// Proper cleanup prevents memory leaks

540

subscriptions.dispose();

541

subject.onComplete();

542

}

543

544

private Observable<String> processBuffer(List<String> buffer) {

545

return Observable.fromCallable(() -> {

546

// Process buffer

547

return "Processed " + buffer.size() + " items";

548

}).subscribeOn(Schedulers.computation());

549

}

550

}

551

```

552

553

## Best Practices

554

555

**Resource Management Guidelines:**

556

557

1. **Always dispose**: Keep references to disposables and dispose them when done

558

2. **Use CompositeDisposable**: For managing multiple subscriptions together

559

3. **Lifecycle awareness**: Dispose in appropriate lifecycle methods (onDestroy, onPause, etc.)

560

4. **Error handling**: Ensure resources are disposed even when errors occur

561

5. **Resource observers**: Use ResourceObserver/ResourceSubscriber for complex resource management

562

6. **SerialDisposable**: Use for sequential operations where you need to cancel previous work

563

7. **Memory leaks**: Always dispose long-running or infinite streams

564

8. **Thread safety**: Disposables are thread-safe and can be disposed from any thread

565

566

**Common Patterns:**

567

568

- Activity/Fragment lifecycle management with CompositeDisposable

569

- Network request cancellation with individual disposables

570

- File/database resource management with ResourceObserver

571

- Background task management with SerialDisposable

572

- Event bus subscriptions with proper cleanup

573

574

## Types

575

576

```java { .api }

577

/**

578

* Functional interface for cleanup actions

579

*/

580

public interface Action {

581

void run() throws Exception;

582

}

583

584

/**

585

* Interface for objects that can be cancelled

586

*/

587

public interface Cancellable {

588

void cancel() throws Exception;

589

}

590

591

/**

592

* Exception thrown by dispose operations

593

*/

594

public class CompositeException extends RuntimeException {

595

public List<Throwable> getExceptions();

596

}

597

```