or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mderror-handling.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

subjects.mddocs/

0

# Subjects and Hot Observables

1

2

Hot observables that can multicast to multiple observers and emit items regardless of subscriptions. Subjects act as both Observable and Observer, making them perfect for bridging reactive and non-reactive code.

3

4

## Capabilities

5

6

### Subject Base Class

7

8

All subjects extend the base Subject class.

9

10

```java { .api }

11

/**

12

* Base class for all subjects that act as both Observable and Observer

13

*/

14

public abstract class Subject<T> extends Observable<T> implements Observer<T> {

15

/**

16

* Returns true if this subject has any observers

17

*/

18

public abstract boolean hasObservers();

19

20

/**

21

* Returns true if this subject has terminated with an error

22

*/

23

public abstract boolean hasThrowable();

24

25

/**

26

* Returns the terminal error if hasThrowable() returns true

27

*/

28

public abstract Throwable getThrowable();

29

30

/**

31

* Returns true if this subject has completed successfully

32

*/

33

public abstract boolean hasComplete();

34

35

/**

36

* Converts this subject to a serialized version (thread-safe)

37

*/

38

public final Subject<T> toSerialized();

39

}

40

```

41

42

### PublishSubject

43

44

Multicasts items to current observers only.

45

46

```java { .api }

47

/**

48

* Subject that multicasts items to currently subscribed observers

49

* Does not replay any items to new subscribers

50

*/

51

public final class PublishSubject<T> extends Subject<T> {

52

/**

53

* Creates a new PublishSubject

54

*/

55

public static <T> PublishSubject<T> create();

56

57

/**

58

* Returns the number of current observers

59

*/

60

public int observerCount();

61

62

// Inherits Observer methods

63

public void onSubscribe(Disposable d);

64

public void onNext(T t);

65

public void onError(Throwable t);

66

public void onComplete();

67

}

68

```

69

70

### BehaviorSubject

71

72

Replays the latest value to new subscribers.

73

74

```java { .api }

75

/**

76

* Subject that replays the latest emitted item to new subscribers

77

* Always has a current value (either initial or most recent)

78

*/

79

public final class BehaviorSubject<T> extends Subject<T> {

80

/**

81

* Creates a BehaviorSubject with an initial value

82

*/

83

public static <T> BehaviorSubject<T> createDefault(T defaultValue);

84

85

/**

86

* Creates a BehaviorSubject without an initial value

87

*/

88

public static <T> BehaviorSubject<T> create();

89

90

/**

91

* Returns the current value if available

92

*/

93

public T getValue();

94

95

/**

96

* Returns true if this subject has a current value

97

*/

98

public boolean hasValue();

99

100

/**

101

* Returns the number of current observers

102

*/

103

public int observerCount();

104

}

105

```

106

107

### ReplaySubject

108

109

Replays all or a subset of emitted items to new subscribers.

110

111

```java { .api }

112

/**

113

* Subject that replays emitted items to new subscribers

114

* Can buffer all items or limit by count/time

115

*/

116

public final class ReplaySubject<T> extends Subject<T> {

117

/**

118

* Creates a ReplaySubject that buffers all items

119

*/

120

public static <T> ReplaySubject<T> create();

121

122

/**

123

* Creates a ReplaySubject with a maximum buffer size

124

*/

125

public static <T> ReplaySubject<T> create(int bufferSize);

126

127

/**

128

* Creates a ReplaySubject that buffers items for a time window

129

*/

130

public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler);

131

132

/**

133

* Creates a ReplaySubject with both size and time limits

134

*/

135

public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize);

136

137

/**

138

* Returns the current number of buffered items

139

*/

140

public int size();

141

142

/**

143

* Returns the current buffered values as an array

144

*/

145

public Object[] getValues();

146

147

/**

148

* Returns the current buffered values as a typed array

149

*/

150

public T[] getValues(T[] array);

151

152

/**

153

* Returns the number of current observers

154

*/

155

public int observerCount();

156

}

157

```

158

159

### AsyncSubject

160

161

Replays only the final value to subscribers.

162

163

```java { .api }

164

/**

165

* Subject that only emits the last value when it completes

166

* Emits nothing if it terminates with an error

167

*/

168

public final class AsyncSubject<T> extends Subject<T> {

169

/**

170

* Creates a new AsyncSubject

171

*/

172

public static <T> AsyncSubject<T> create();

173

174

/**

175

* Returns the final value if the subject has completed successfully

176

*/

177

public T getValue();

178

179

/**

180

* Returns true if this subject has a final value

181

*/

182

public boolean hasValue();

183

184

/**

185

* Returns the number of current observers

186

*/

187

public int observerCount();

188

}

189

```

190

191

### UnicastSubject

192

193

Single-observer subject with optional buffering.

194

195

```java { .api }

196

/**

197

* Subject that allows only one observer and buffers items until subscription

198

*/

199

public final class UnicastSubject<T> extends Subject<T> {

200

/**

201

* Creates a UnicastSubject with unlimited buffering

202

*/

203

public static <T> UnicastSubject<T> create();

204

205

/**

206

* Creates a UnicastSubject with a capacity hint and cleanup callback

207

*/

208

public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate);

209

210

/**

211

* Creates a UnicastSubject with cleanup callback and delay error flag

212

*/

213

public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError);

214

}

215

```

216

217

### Single, Maybe, and Completable Subjects

218

219

Subjects for other reactive types.

220

221

```java { .api }

222

/**

223

* Subject for Single reactive type

224

*/

225

public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {

226

public static <T> SingleSubject<T> create();

227

public boolean hasObservers();

228

public boolean hasValue();

229

public boolean hasThrowable();

230

public T getValue();

231

public Throwable getThrowable();

232

public void onSubscribe(Disposable d);

233

public void onSuccess(T value);

234

public void onError(Throwable e);

235

}

236

237

/**

238

* Subject for Maybe reactive type

239

*/

240

public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {

241

public static <T> MaybeSubject<T> create();

242

public boolean hasObservers();

243

public boolean hasValue();

244

public boolean hasComplete();

245

public boolean hasThrowable();

246

public T getValue();

247

public Throwable getThrowable();

248

public void onSubscribe(Disposable d);

249

public void onSuccess(T value);

250

public void onError(Throwable e);

251

public void onComplete();

252

}

253

254

/**

255

* Subject for Completable reactive type

256

*/

257

public final class CompletableSubject extends Completable implements CompletableObserver {

258

public static CompletableSubject create();

259

public boolean hasObservers();

260

public boolean hasComplete();

261

public boolean hasThrowable();

262

public Throwable getThrowable();

263

public void onSubscribe(Disposable d);

264

public void onComplete();

265

public void onError(Throwable e);

266

}

267

```

268

269

## Usage Examples

270

271

**PublishSubject - Live Event Broadcasting:**

272

273

```java

274

import io.reactivex.subjects.PublishSubject;

275

276

PublishSubject<String> eventBus = PublishSubject.create();

277

278

// First subscriber

279

eventBus.subscribe(event -> System.out.println("Subscriber 1: " + event));

280

281

// Emit events

282

eventBus.onNext("Event 1");

283

eventBus.onNext("Event 2");

284

285

// Second subscriber (won't receive previous events)

286

eventBus.subscribe(event -> System.out.println("Subscriber 2: " + event));

287

288

eventBus.onNext("Event 3"); // Both subscribers receive this

289

290

// Clean termination

291

eventBus.onComplete();

292

```

293

294

**BehaviorSubject - Current State Management:**

295

296

```java

297

import io.reactivex.subjects.BehaviorSubject;

298

299

// User state management

300

BehaviorSubject<String> userState = BehaviorSubject.createDefault("logged_out");

301

302

// UI component subscribes

303

userState.subscribe(state -> System.out.println("UI: User is " + state));

304

305

// State changes

306

userState.onNext("logging_in");

307

userState.onNext("logged_in");

308

309

// New component subscribes and immediately gets current state

310

userState.subscribe(state -> System.out.println("New Component: User is " + state));

311

312

// Check current state

313

if (userState.hasValue()) {

314

System.out.println("Current state: " + userState.getValue());

315

}

316

```

317

318

**ReplaySubject - Event History:**

319

320

```java

321

import io.reactivex.subjects.ReplaySubject;

322

323

// Replay last 3 events

324

ReplaySubject<String> history = ReplaySubject.create(3);

325

326

// Emit some events

327

history.onNext("Event 1");

328

history.onNext("Event 2");

329

history.onNext("Event 3");

330

history.onNext("Event 4");

331

history.onNext("Event 5");

332

333

// New subscriber gets last 3 events

334

history.subscribe(event -> System.out.println("Late subscriber: " + event));

335

// Output: Event 3, Event 4, Event 5

336

337

// Time-based replay

338

ReplaySubject<String> timeHistory = ReplaySubject.createWithTime(

339

2, TimeUnit.SECONDS, Schedulers.computation());

340

341

timeHistory.onNext("Old event");

342

Thread.sleep(3000);

343

timeHistory.onNext("Recent event");

344

345

// New subscriber only gets recent event

346

timeHistory.subscribe(event -> System.out.println("Time subscriber: " + event));

347

```

348

349

**AsyncSubject - Final Result:**

350

351

```java

352

import io.reactivex.subjects.AsyncSubject;

353

354

AsyncSubject<String> calculation = AsyncSubject.create();

355

356

// Subscribers only get the final result

357

calculation.subscribe(result -> System.out.println("Result 1: " + result));

358

359

// Emit intermediate values (not received by observers)

360

calculation.onNext("Step 1");

361

calculation.onNext("Step 2");

362

calculation.onNext("Final Result");

363

364

// Must complete for observers to receive the final value

365

calculation.onComplete();

366

367

// Late subscriber still gets the final result

368

calculation.subscribe(result -> System.out.println("Result 2: " + result));

369

```

370

371

**UnicastSubject - Single Observer with Buffering:**

372

373

```java

374

import io.reactivex.subjects.UnicastSubject;

375

376

UnicastSubject<Integer> unicast = UnicastSubject.create();

377

378

// Emit items before subscription (they get buffered)

379

unicast.onNext(1);

380

unicast.onNext(2);

381

unicast.onNext(3);

382

383

// First subscriber gets all buffered items

384

unicast.subscribe(value -> System.out.println("Unicast: " + value));

385

386

// Subsequent items delivered immediately

387

unicast.onNext(4);

388

unicast.onNext(5);

389

390

// Only one observer allowed - second subscription will error

391

// unicast.subscribe(value -> System.out.println("Second: " + value)); // IllegalStateException

392

```

393

394

**SingleSubject - Async Result:**

395

396

```java

397

import io.reactivex.subjects.SingleSubject;

398

399

SingleSubject<String> asyncResult = SingleSubject.create();

400

401

// Multiple subscribers can wait for the same result

402

asyncResult.subscribe(result -> System.out.println("Observer 1: " + result));

403

asyncResult.subscribe(result -> System.out.println("Observer 2: " + result));

404

405

// Simulate async operation

406

new Thread(() -> {

407

try {

408

Thread.sleep(2000);

409

asyncResult.onSuccess("Async operation completed");

410

} catch (Exception e) {

411

asyncResult.onError(e);

412

}

413

}).start();

414

```

415

416

**Thread Safety with Serialized Subjects:**

417

418

```java

419

PublishSubject<String> unsafeSubject = PublishSubject.create();

420

Subject<String> safeSubject = unsafeSubject.toSerialized();

421

422

// Multiple threads can safely emit to serialized subject

423

for (int i = 0; i < 10; i++) {

424

final int threadId = i;

425

new Thread(() -> {

426

for (int j = 0; j < 100; j++) {

427

safeSubject.onNext("Thread " + threadId + ", Item " + j);

428

}

429

}).start();

430

}

431

432

safeSubject.subscribe(item -> System.out.println("Received: " + item));

433

```

434

435

**Error Handling with Subjects:**

436

437

```java

438

PublishSubject<String> subject = PublishSubject.create();

439

440

subject.subscribe(

441

item -> System.out.println("Item: " + item),

442

error -> System.err.println("Error: " + error.getMessage()),

443

() -> System.out.println("Completed")

444

);

445

446

subject.onNext("Item 1");

447

subject.onNext("Item 2");

448

449

// Error terminates the subject

450

subject.onError(new RuntimeException("Something went wrong"));

451

452

// No more items can be emitted after error

453

// subject.onNext("Item 3"); // This would be ignored

454

```

455

456

**Bridging Callback APIs with Subjects:**

457

458

```java

459

// Bridge traditional callback API to reactive streams

460

public class WeatherService {

461

private final PublishSubject<Weather> weatherUpdates = PublishSubject.create();

462

463

public Observable<Weather> getWeatherUpdates() {

464

return weatherUpdates.asObservable(); // Hide subject implementation

465

}

466

467

// Called by external weather API

468

public void onWeatherUpdate(Weather weather) {

469

weatherUpdates.onNext(weather);

470

}

471

472

public void onWeatherError(Exception error) {

473

weatherUpdates.onError(error);

474

}

475

}

476

477

// Usage

478

WeatherService service = new WeatherService();

479

service.getWeatherUpdates()

480

.subscribe(weather -> System.out.println("Weather: " + weather));

481

```

482

483

## Subject Guidelines

484

485

**When to use each subject:**

486

487

- **PublishSubject**: Event buses, live data streams, notifications

488

- **BehaviorSubject**: State management, current values, configuration

489

- **ReplaySubject**: Event history, audit logs, caching recent data

490

- **AsyncSubject**: Final results, completion notifications

491

- **UnicastSubject**: Single consumer scenarios, back-pressure handling

492

493

**Best Practices:**

494

495

1. Always use `toSerialized()` when multiple threads emit to a subject

496

2. Prefer `asObservable()` to hide the subject from consumers

497

3. Handle termination properly (onComplete/onError)

498

4. Be careful with memory leaks in ReplaySubject

499

5. Consider using Processors for backpressure-aware subjects

500

6. Don't emit to subjects after they've terminated

501

502

## Types

503

504

```java { .api }

505

/**

506

* Exception thrown when trying to subscribe multiple observers to UnicastSubject

507

*/

508

public final class IllegalStateException extends RuntimeException {

509

// Standard exception

510

}

511

512

/**

513

* Base interfaces for all subjects

514

*/

515

public interface ObservableSource<T> {

516

void subscribe(Observer<? super T> observer);

517

}

518

519

public interface Observer<T> {

520

void onSubscribe(Disposable d);

521

void onNext(T t);

522

void onError(Throwable e);

523

void onComplete();

524

}

525

```