or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

subjects.mddocs/

0

# Subjects and Processors

1

2

Subjects and Processors are hot observables that act as both observer and observable, enabling multicasting of events to multiple subscribers. They bridge the gap between imperative and reactive programming by allowing manual event emission.

3

4

## Capabilities

5

6

### Observable Subjects

7

8

Hot observables for multicasting without backpressure support.

9

10

```java { .api }

11

/**

12

* Subject that emits to all current subscribers

13

*/

14

public final class PublishSubject<T> extends Subject<T> {

15

/**

16

* Create a new PublishSubject

17

* @return new PublishSubject instance

18

*/

19

public static <T> PublishSubject<T> create();

20

21

/**

22

* Emit an item to all subscribers

23

* @param value the item to emit

24

*/

25

public void onNext(T value);

26

27

/**

28

* Emit an error to all subscribers and terminate

29

* @param error the error to emit

30

*/

31

public void onError(Throwable error);

32

33

/**

34

* Complete all subscribers

35

*/

36

public void onComplete();

37

38

/**

39

* Check if this subject has subscribers

40

* @return true if has subscribers, false otherwise

41

*/

42

public boolean hasObservers();

43

44

/**

45

* Check if this subject has completed

46

* @return true if completed, false otherwise

47

*/

48

public boolean hasComplete();

49

50

/**

51

* Check if this subject has an error

52

* @return true if terminated with error, false otherwise

53

*/

54

public boolean hasThrowable();

55

56

/**

57

* Get the terminating error if any

58

* @return the error that terminated this subject, null if none

59

*/

60

public Throwable getThrowable();

61

}

62

63

/**

64

* Subject that replays items to new subscribers

65

*/

66

public final class ReplaySubject<T> extends Subject<T> {

67

/**

68

* Create an unbounded ReplaySubject

69

* @return new ReplaySubject that replays all items

70

*/

71

public static <T> ReplaySubject<T> create();

72

73

/**

74

* Create a size-bounded ReplaySubject

75

* @param maxSize maximum number of items to replay

76

* @return new ReplaySubject with size limit

77

*/

78

public static <T> ReplaySubject<T> createWithSize(int maxSize);

79

80

/**

81

* Create a time-bounded ReplaySubject

82

* @param maxAge maximum age of items to replay

83

* @param unit time unit for maxAge

84

* @return new ReplaySubject with time limit

85

*/

86

public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit);

87

88

/**

89

* Create a time and size bounded ReplaySubject

90

* @param maxSize maximum number of items to replay

91

* @param maxAge maximum age of items to replay

92

* @param unit time unit for maxAge

93

* @return new ReplaySubject with both limits

94

*/

95

public static <T> ReplaySubject<T> createWithTimeAndSize(int maxSize, long maxAge, TimeUnit unit);

96

97

/**

98

* Get all cached values

99

* @return array of all cached values

100

*/

101

public Object[] getValues();

102

103

/**

104

* Get all cached values as specific type

105

* @param array array to fill with values

106

* @return array filled with cached values

107

*/

108

public T[] getValues(T[] array);

109

110

/**

111

* Get count of cached values

112

* @return number of cached values

113

*/

114

public int size();

115

}

116

117

/**

118

* Subject that only emits the last item to new subscribers

119

*/

120

public final class BehaviorSubject<T> extends Subject<T> {

121

/**

122

* Create a BehaviorSubject without initial value

123

* @return new BehaviorSubject

124

*/

125

public static <T> BehaviorSubject<T> create();

126

127

/**

128

* Create a BehaviorSubject with initial value

129

* @param defaultValue the initial value

130

* @return new BehaviorSubject with initial value

131

*/

132

public static <T> BehaviorSubject<T> createDefault(T defaultValue);

133

134

/**

135

* Get the current value if any

136

* @return the current value or null if none

137

*/

138

public T getValue();

139

140

/**

141

* Check if this subject has a current value

142

* @return true if has current value, false otherwise

143

*/

144

public boolean hasValue();

145

}

146

147

/**

148

* Subject that only emits items after onComplete is called

149

*/

150

public final class AsyncSubject<T> extends Subject<T> {

151

/**

152

* Create a new AsyncSubject

153

* @return new AsyncSubject instance

154

*/

155

public static <T> AsyncSubject<T> create();

156

157

/**

158

* Get the final value if completed successfully

159

* @return the final value or null if none or not completed

160

*/

161

public T getValue();

162

163

/**

164

* Check if this subject has a final value

165

* @return true if has final value, false otherwise

166

*/

167

public boolean hasValue();

168

}

169

```

170

171

### Flowable Processors

172

173

Hot flowables with backpressure support for Reactive Streams.

174

175

```java { .api }

176

/**

177

* Processor that emits to all current subscribers with backpressure

178

*/

179

public final class PublishProcessor<T> extends FlowableProcessor<T> {

180

/**

181

* Create a new PublishProcessor

182

* @return new PublishProcessor instance

183

*/

184

public static <T> PublishProcessor<T> create();

185

186

/**

187

* Emit an item to all subscribers

188

* @param value the item to emit

189

*/

190

public void onNext(T value);

191

192

/**

193

* Emit an error to all subscribers and terminate

194

* @param error the error to emit

195

*/

196

public void onError(Throwable error);

197

198

/**

199

* Complete all subscribers

200

*/

201

public void onComplete();

202

203

/**

204

* Offer an item with backpressure handling

205

* @param value the item to offer

206

* @return true if accepted, false if would violate backpressure

207

*/

208

public boolean offer(T value);

209

210

/**

211

* Check if this processor has subscribers

212

* @return true if has subscribers, false otherwise

213

*/

214

public boolean hasSubscribers();

215

}

216

217

/**

218

* Processor that replays items to new subscribers with backpressure

219

*/

220

public final class ReplayProcessor<T> extends FlowableProcessor<T> {

221

/**

222

* Create an unbounded ReplayProcessor

223

* @return new ReplayProcessor that replays all items

224

*/

225

public static <T> ReplayProcessor<T> create();

226

227

/**

228

* Create a size-bounded ReplayProcessor

229

* @param maxSize maximum number of items to replay

230

* @return new ReplayProcessor with size limit

231

*/

232

public static <T> ReplayProcessor<T> createWithSize(int maxSize);

233

234

/**

235

* Create a time-bounded ReplayProcessor

236

* @param maxAge maximum age of items to replay

237

* @param unit time unit for maxAge

238

* @return new ReplayProcessor with time limit

239

*/

240

public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit);

241

242

/**

243

* Get all cached values

244

* @return array of all cached values

245

*/

246

public Object[] getValues();

247

248

/**

249

* Get count of cached values

250

* @return number of cached values

251

*/

252

public int size();

253

}

254

255

/**

256

* Processor that only emits the last item to new subscribers

257

*/

258

public final class BehaviorProcessor<T> extends FlowableProcessor<T> {

259

/**

260

* Create a BehaviorProcessor without initial value

261

* @return new BehaviorProcessor

262

*/

263

public static <T> BehaviorProcessor<T> create();

264

265

/**

266

* Create a BehaviorProcessor with initial value

267

* @param defaultValue the initial value

268

* @return new BehaviorProcessor with initial value

269

*/

270

public static <T> BehaviorProcessor<T> createDefault(T defaultValue);

271

272

/**

273

* Get the current value if any

274

* @return the current value or null if none

275

*/

276

public T getValue();

277

278

/**

279

* Check if this processor has a current value

280

* @return true if has current value, false otherwise

281

*/

282

public boolean hasValue();

283

}

284

285

/**

286

* Processor that only emits items after onComplete is called

287

*/

288

public final class AsyncProcessor<T> extends FlowableProcessor<T> {

289

/**

290

* Create a new AsyncProcessor

291

* @return new AsyncProcessor instance

292

*/

293

public static <T> AsyncProcessor<T> create();

294

295

/**

296

* Get the final value if completed successfully

297

* @return the final value or null if none or not completed

298

*/

299

public T getValue();

300

301

/**

302

* Check if this processor has a final value

303

* @return true if has final value, false otherwise

304

*/

305

public boolean hasValue();

306

}

307

```

308

309

### Single and Maybe Subjects

310

311

Subjects for single-value reactive types.

312

313

```java { .api }

314

/**

315

* Subject for Single operations

316

*/

317

public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {

318

/**

319

* Create a new SingleSubject

320

* @return new SingleSubject instance

321

*/

322

public static <T> SingleSubject<T> create();

323

324

/**

325

* Emit a success value to all observers

326

* @param value the value to emit

327

*/

328

public void onSuccess(T value);

329

330

/**

331

* Emit an error to all observers

332

* @param error the error to emit

333

*/

334

public void onError(Throwable error);

335

336

/**

337

* Check if this subject has observers

338

* @return true if has observers, false otherwise

339

*/

340

public boolean hasObservers();

341

342

/**

343

* Get the success value if any

344

* @return the success value or null if none

345

*/

346

public T getValue();

347

348

/**

349

* Get the error if any

350

* @return the error or null if none

351

*/

352

public Throwable getThrowable();

353

}

354

355

/**

356

* Subject for Maybe operations

357

*/

358

public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {

359

/**

360

* Create a new MaybeSubject

361

* @return new MaybeSubject instance

362

*/

363

public static <T> MaybeSubject<T> create();

364

365

/**

366

* Emit a success value to all observers

367

* @param value the value to emit

368

*/

369

public void onSuccess(T value);

370

371

/**

372

* Emit an error to all observers

373

* @param error the error to emit

374

*/

375

public void onError(Throwable error);

376

377

/**

378

* Complete all observers without emitting a value

379

*/

380

public void onComplete();

381

382

/**

383

* Check if this subject has observers

384

* @return true if has observers, false otherwise

385

*/

386

public boolean hasObservers();

387

388

/**

389

* Get the success value if any

390

* @return the success value or null if none

391

*/

392

public T getValue();

393

}

394

395

/**

396

* Subject for Completable operations

397

*/

398

public final class CompletableSubject extends Completable implements CompletableObserver {

399

/**

400

* Create a new CompletableSubject

401

* @return new CompletableSubject instance

402

*/

403

public static CompletableSubject create();

404

405

/**

406

* Complete all observers

407

*/

408

public void onComplete();

409

410

/**

411

* Emit an error to all observers

412

* @param error the error to emit

413

*/

414

public void onError(Throwable error);

415

416

/**

417

* Check if this subject has observers

418

* @return true if has observers, false otherwise

419

*/

420

public boolean hasObservers();

421

422

/**

423

* Get the error if any

424

* @return the error or null if none

425

*/

426

public Throwable getThrowable();

427

}

428

```

429

430

## Types

431

432

```java { .api }

433

/**

434

* Base class for all Observable subjects

435

*/

436

public abstract class Subject<T> extends Observable<T> implements Observer<T> {

437

/**

438

* Convert this subject to a serialized version for thread safety

439

* @return serialized version of this subject

440

*/

441

public final Subject<T> toSerialized();

442

443

/**

444

* Check if this subject has observers

445

* @return true if has observers, false otherwise

446

*/

447

public abstract boolean hasObservers();

448

449

/**

450

* Check if this subject has completed normally

451

* @return true if completed, false otherwise

452

*/

453

public abstract boolean hasComplete();

454

455

/**

456

* Check if this subject terminated with an error

457

* @return true if terminated with error, false otherwise

458

*/

459

public abstract boolean hasThrowable();

460

461

/**

462

* Get the terminating error if any

463

* @return the error or null if none

464

*/

465

public abstract Throwable getThrowable();

466

}

467

468

/**

469

* Base class for all Flowable processors

470

*/

471

public abstract class FlowableProcessor<T> extends Flowable<T> implements FlowableSubscriber<T>, Processor<T, T> {

472

/**

473

* Convert this processor to a serialized version for thread safety

474

* @return serialized version of this processor

475

*/

476

public final FlowableProcessor<T> toSerialized();

477

478

/**

479

* Check if this processor has subscribers

480

* @return true if has subscribers, false otherwise

481

*/

482

public abstract boolean hasSubscribers();

483

484

/**

485

* Check if this processor has completed normally

486

* @return true if completed, false otherwise

487

*/

488

public abstract boolean hasComplete();

489

490

/**

491

* Check if this processor terminated with an error

492

* @return true if terminated with error, false otherwise

493

*/

494

public abstract boolean hasThrowable();

495

496

/**

497

* Get the terminating error if any

498

* @return the error or null if none

499

*/

500

public abstract Throwable getThrowable();

501

}

502

```

503

504

**Usage Examples:**

505

506

```java

507

import io.reactivex.rxjava3.subjects.*;

508

import io.reactivex.rxjava3.processors.*;

509

import io.reactivex.rxjava3.core.*;

510

import java.util.concurrent.TimeUnit;

511

512

// PublishSubject - hot multicast

513

PublishSubject<String> publishSubject = PublishSubject.create();

514

515

// Subscribe multiple observers

516

publishSubject.subscribe(value -> System.out.println("Observer 1: " + value));

517

publishSubject.subscribe(value -> System.out.println("Observer 2: " + value));

518

519

// Emit items

520

publishSubject.onNext("Hello");

521

publishSubject.onNext("World");

522

523

// Late subscriber won't receive previous items

524

publishSubject.subscribe(value -> System.out.println("Late Observer: " + value));

525

publishSubject.onNext("Late Item");

526

527

publishSubject.onComplete();

528

529

// BehaviorSubject - remembers last value

530

BehaviorSubject<String> behaviorSubject = BehaviorSubject.createDefault("Initial");

531

532

behaviorSubject.subscribe(value -> System.out.println("Behavior 1: " + value));

533

behaviorSubject.onNext("Update 1");

534

535

// Late subscriber gets the last value

536

behaviorSubject.subscribe(value -> System.out.println("Behavior 2: " + value));

537

behaviorSubject.onNext("Update 2");

538

539

// ReplaySubject - replays all previous items

540

ReplaySubject<String> replaySubject = ReplaySubject.create();

541

542

replaySubject.onNext("Item 1");

543

replaySubject.onNext("Item 2");

544

545

// Late subscriber gets all previous items

546

replaySubject.subscribe(value -> System.out.println("Replay: " + value));

547

replaySubject.onNext("Item 3");

548

549

// AsyncSubject - only emits the last item when completed

550

AsyncSubject<String> asyncSubject = AsyncSubject.create();

551

552

asyncSubject.subscribe(value -> System.out.println("Async: " + value));

553

asyncSubject.onNext("First");

554

asyncSubject.onNext("Second");

555

asyncSubject.onNext("Last");

556

asyncSubject.onComplete(); // Only "Last" is emitted

557

558

// SingleSubject for single-value operations

559

SingleSubject<String> singleSubject = SingleSubject.create();

560

561

singleSubject.subscribe(System.out::println);

562

singleSubject.onSuccess("Single Value");

563

564

// MaybeSubject for optional values

565

MaybeSubject<String> maybeSubject = MaybeSubject.create();

566

567

maybeSubject.subscribe(

568

value -> System.out.println("Maybe success: " + value),

569

error -> System.err.println("Maybe error: " + error),

570

() -> System.out.println("Maybe complete (empty)")

571

);

572

573

// Can emit value or complete empty

574

if (Math.random() > 0.5) {

575

maybeSubject.onSuccess("Maybe Value");

576

} else {

577

maybeSubject.onComplete();

578

}

579

580

// CompletableSubject for completion-only operations

581

CompletableSubject completableSubject = CompletableSubject.create();

582

583

completableSubject.subscribe(

584

() -> System.out.println("Completable completed"),

585

error -> System.err.println("Completable error: " + error)

586

);

587

588

completableSubject.onComplete();

589

590

// Processors for backpressured streams

591

PublishProcessor<Integer> publishProcessor = PublishProcessor.create();

592

593

publishProcessor.subscribe(value -> System.out.println("Processor: " + value));

594

595

// Emit with backpressure handling

596

for (int i = 0; i < 5; i++) {

597

if (publishProcessor.offer(i)) {

598

System.out.println("Offered: " + i);

599

} else {

600

System.out.println("Backpressure: could not offer " + i);

601

}

602

}

603

604

publishProcessor.onComplete();

605

606

// Thread-safe serialized subjects

607

Subject<String> serializedSubject = PublishSubject.<String>create().toSerialized();

608

609

// Safe to call from multiple threads

610

new Thread(() -> serializedSubject.onNext("Thread 1")).start();

611

new Thread(() -> serializedSubject.onNext("Thread 2")).start();

612

613

// Bridge between imperative and reactive code

614

class EventBus {

615

private final PublishSubject<String> eventSubject = PublishSubject.create();

616

617

public Observable<String> getEvents() {

618

return eventSubject;

619

}

620

621

public void publishEvent(String event) {

622

eventSubject.onNext(event);

623

}

624

}

625

626

EventBus eventBus = new EventBus();

627

eventBus.getEvents().subscribe(event -> System.out.println("Event: " + event));

628

eventBus.publishEvent("User logged in");

629

eventBus.publishEvent("Data updated");

630

631

// State management with BehaviorSubject

632

class StateManager<T> {

633

private final BehaviorSubject<T> stateSubject;

634

635

public StateManager(T initialState) {

636

this.stateSubject = BehaviorSubject.createDefault(initialState);

637

}

638

639

public Observable<T> getState() {

640

return stateSubject.distinctUntilChanged();

641

}

642

643

public T getCurrentState() {

644

return stateSubject.getValue();

645

}

646

647

public void setState(T newState) {

648

stateSubject.onNext(newState);

649

}

650

}

651

652

StateManager<String> stateManager = new StateManager<>("Initial State");

653

stateManager.getState().subscribe(state -> System.out.println("State: " + state));

654

stateManager.setState("Updated State");

655

stateManager.setState("Final State");

656

```

657

658

## Subject Selection Guide

659

660

### When to Use Each Subject Type

661

662

- **`PublishSubject`**: Event bus, real-time notifications, hot observables

663

- **`BehaviorSubject`**: State management, current value access, configuration settings

664

- **`ReplaySubject`**: Caching, message replay, audit trails

665

- **`AsyncSubject`**: Final result computation, completion notifications

666

- **`SingleSubject`**: Single async operations, request/response patterns

667

- **`MaybeSubject`**: Optional async operations, cache lookups

668

- **`CompletableSubject`**: Fire-and-forget operations, cleanup tasks

669

670

### Best Practices

671

672

1. **Use `toSerialized()`** when accessing subjects from multiple threads

673

2. **Complete subjects** to free resources and notify subscribers

674

3. **Handle errors appropriately** as they terminate the subject

675

4. **Prefer BehaviorSubject** for state that needs current value access

676

5. **Use ReplaySubject with bounds** to prevent memory leaks

677

6. **Consider processors** for backpressured scenarios

678

7. **Dispose of subscriptions** to prevent memory leaks in long-lived subjects