or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mderror-handling.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

observable.mddocs/

0

# Observable Streams

1

2

Cold observable sequences for 0-N items without backpressure support. Observable is the most commonly used reactive type in RxJava, ideal for UI events, HTTP requests, and general reactive programming patterns.

3

4

## Capabilities

5

6

### Observable Creation

7

8

Factory methods for creating Observable instances from various sources.

9

10

```java { .api }

11

/**

12

* Creates an Observable that emits the provided items then completes

13

*/

14

public static <T> Observable<T> just(T item);

15

public static <T> Observable<T> just(T item1, T item2);

16

public static <T> Observable<T> just(T item1, T item2, T item3);

17

// ... up to 10 items

18

19

/**

20

* Creates an Observable that emits all items from an array

21

*/

22

public static <T> Observable<T> fromArray(T... array);

23

24

/**

25

* Creates an Observable that emits all items from an Iterable

26

*/

27

public static <T> Observable<T> fromIterable(Iterable<? extends T> source);

28

29

/**

30

* Creates an Observable from a Callable that will be called for each observer

31

*/

32

public static <T> Observable<T> fromCallable(Callable<? extends T> callable);

33

34

/**

35

* Creates an Observable from a Future

36

*/

37

public static <T> Observable<T> fromFuture(Future<? extends T> future);

38

39

/**

40

* Creates an Observable using the provided ObservableOnSubscribe function

41

*/

42

public static <T> Observable<T> create(ObservableOnSubscribe<T> source);

43

44

/**

45

* Creates an Observable that emits sequential numbers every specified interval

46

*/

47

public static Observable<Long> interval(long period, TimeUnit unit);

48

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);

49

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler);

50

51

/**

52

* Creates an Observable that emits a range of sequential integers

53

*/

54

public static Observable<Integer> range(int start, int count);

55

56

/**

57

* Creates an Observable that emits a single 0L after a delay

58

*/

59

public static Observable<Long> timer(long delay, TimeUnit unit);

60

public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler);

61

62

/**

63

* Creates an empty Observable that only calls onComplete

64

*/

65

public static <T> Observable<T> empty();

66

67

/**

68

* Creates an Observable that never emits any items and never terminates

69

*/

70

public static <T> Observable<T> never();

71

72

/**

73

* Creates an Observable that only calls onError

74

*/

75

public static <T> Observable<T> error(Throwable exception);

76

public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier);

77

78

/**

79

* Defers the creation of the Observable until subscription

80

*/

81

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> observableSupplier);

82

```

83

84

### Transformation Operators

85

86

Transform items emitted by an Observable.

87

88

```java { .api }

89

/**

90

* Transforms items by applying a function to each item

91

*/

92

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);

93

94

/**

95

* Transforms items into Observables and flattens them into a single Observable

96

*/

97

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

98

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int maxConcurrency);

99

100

/**

101

* Similar to flatMap but maintains the order of the original items

102

*/

103

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

104

105

/**

106

* Similar to flatMap but only subscribes to the most recent inner Observable

107

*/

108

public final <R> Observable<R> switchMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

109

110

/**

111

* Casts each item to the specified type

112

*/

113

public final <U> Observable<U> cast(Class<U> clazz);

114

115

/**

116

* Applies a function to each item and emits the result

117

*/

118

public final <R> Observable<R> scan(BiFunction<R, ? super T, R> accumulator);

119

public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator);

120

121

/**

122

* Groups items by a key selector function

123

*/

124

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector);

125

126

/**

127

* Collects items into buffers

128

*/

129

public final Observable<List<T>> buffer(int count);

130

public final Observable<List<T>> buffer(long timespan, TimeUnit unit);

131

public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler);

132

133

/**

134

* Creates non-overlapping windows of items

135

*/

136

public final Observable<Observable<T>> window(int count);

137

public final Observable<Observable<T>> window(long timespan, TimeUnit unit);

138

```

139

140

### Filtering Operators

141

142

Filter items emitted by an Observable.

143

144

```java { .api }

145

/**

146

* Filters items based on a predicate

147

*/

148

public final Observable<T> filter(Predicate<? super T> predicate);

149

150

/**

151

* Emits only the first n items

152

*/

153

public final Observable<T> take(long count);

154

155

/**

156

* Emits items for a specified duration

157

*/

158

public final Observable<T> take(long time, TimeUnit unit);

159

160

/**

161

* Skips the first n items

162

*/

163

public final Observable<T> skip(long count);

164

165

/**

166

* Skips items for a specified duration

167

*/

168

public final Observable<T> skip(long time, TimeUnit unit);

169

170

/**

171

* Emits only distinct items

172

*/

173

public final Observable<T> distinct();

174

public final <K> Observable<T> distinct(Function<? super T, K> keySelector);

175

176

/**

177

* Emits only items that are different from the previous item

178

*/

179

public final Observable<T> distinctUntilChanged();

180

public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector);

181

182

/**

183

* Emits only the first item that matches a predicate

184

*/

185

public final Observable<T> takeWhile(Predicate<? super T> predicate);

186

187

/**

188

* Skips items while a predicate is true

189

*/

190

public final Observable<T> skipWhile(Predicate<? super T> predicate);

191

192

/**

193

* Emits only the first item, or throws NoSuchElementException if empty

194

*/

195

public final Observable<T> first(T defaultItem);

196

197

/**

198

* Emits only the last item

199

*/

200

public final Observable<T> last(T defaultItem);

201

202

/**

203

* Emits only the single item, or throws exception if more than one

204

*/

205

public final Observable<T> single(T defaultItem);

206

207

/**

208

* Ignores all items and only emits completion

209

*/

210

public final Completable ignoreElements();

211

```

212

213

### Combining Operators

214

215

Combine multiple Observables.

216

217

```java { .api }

218

/**

219

* Combines two Observables by emitting an item when either emits

220

*/

221

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);

222

public static <T> Observable<T> merge(ObservableSource<? extends ObservableSource<? extends T>> sources);

223

224

/**

225

* Concatenates Observables sequentially

226

*/

227

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);

228

public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources);

229

230

/**

231

* Combines latest values from multiple Observables

232

*/

233

public static <T1, T2, R> Observable<R> combineLatest(

234

ObservableSource<T1> source1,

235

ObservableSource<T2> source2,

236

BiFunction<? super T1, ? super T2, ? extends R> combiner

237

);

238

239

/**

240

* Zips items from multiple Observables together

241

*/

242

public static <T1, T2, R> Observable<R> zip(

243

ObservableSource<T1> source1,

244

ObservableSource<T2> source2,

245

BiFunction<? super T1, ? super T2, ? extends R> zipper

246

);

247

248

/**

249

* Returns the first Observable to emit or terminate

250

*/

251

public static <T> Observable<T> amb(ObservableSource<? extends T>... sources);

252

253

/**

254

* Prepends items to the beginning of an Observable

255

*/

256

public final Observable<T> startWith(T item);

257

public final Observable<T> startWith(T... items);

258

public final Observable<T> startWithArray(T... items);

259

260

/**

261

* Appends items to the end of an Observable

262

*/

263

public final Observable<T> concatWith(ObservableSource<? extends T> other);

264

```

265

266

### Threading Operators

267

268

Control threading and execution context.

269

270

```java { .api }

271

/**

272

* Specifies the Scheduler on which the source Observable will operate

273

*/

274

public final Observable<T> subscribeOn(Scheduler scheduler);

275

276

/**

277

* Specifies the Scheduler on which observers will be notified

278

*/

279

public final Observable<T> observeOn(Scheduler scheduler);

280

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError);

281

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize);

282

```

283

284

### Side-Effect Operators

285

286

Perform side effects without modifying the stream.

287

288

```java { .api }

289

/**

290

* Invokes an action for each item emitted

291

*/

292

public final Observable<T> doOnNext(Consumer<? super T> onNext);

293

294

/**

295

* Invokes an action when an error is emitted

296

*/

297

public final Observable<T> doOnError(Consumer<? super Throwable> onError);

298

299

/**

300

* Invokes an action when the Observable completes normally

301

*/

302

public final Observable<T> doOnComplete(Action onComplete);

303

304

/**

305

* Invokes an action when a subscription occurs

306

*/

307

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);

308

309

/**

310

* Invokes an action when the Observable terminates (completes or errors)

311

*/

312

public final Observable<T> doOnTerminate(Action onTerminate);

313

314

/**

315

* Invokes an action after the Observable terminates

316

*/

317

public final Observable<T> doAfterTerminate(Action onFinally);

318

319

/**

320

* Invokes an action when the subscription is disposed

321

*/

322

public final Observable<T> doOnDispose(Action onDispose);

323

```

324

325

### Temporal Operators

326

327

Control timing and temporal behavior of streams.

328

329

```java { .api }

330

/**

331

* Only emit items if no other item was emitted within a time window

332

*/

333

public final Observable<T> debounce(long timeout, TimeUnit unit);

334

public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler);

335

336

/**

337

* Emit only the first item in each time window

338

*/

339

public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit);

340

public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler);

341

342

/**

343

* Emit only the last item in each time window

344

*/

345

public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit);

346

public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler);

347

348

/**

349

* Shift emissions forward in time by a specified delay

350

*/

351

public final Observable<T> delay(long delay, TimeUnit unit);

352

public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler);

353

354

/**

355

* Sample items periodically

356

*/

357

public final Observable<T> sample(long period, TimeUnit unit);

358

public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler);

359

```

360

361

### Utility Operators

362

363

Utility operations for various purposes.

364

365

```java { .api }

366

/**

367

* Mirror source Observable, but terminate with TimeoutException if no item is emitted within timeout

368

*/

369

public final Observable<T> timeout(long timeout, TimeUnit timeUnit);

370

public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler);

371

public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> fallback);

372

373

/**

374

* Cache emissions for replay to all future subscribers

375

*/

376

public final Observable<T> cache();

377

public final Observable<T> cache(int capacityHint);

378

379

/**

380

* Convert notifications into Notification objects

381

*/

382

public final Observable<Notification<T>> materialize();

383

384

/**

385

* Emit the specified value if source is empty

386

*/

387

public final Observable<T> defaultIfEmpty(T defaultItem);

388

389

/**

390

* Count the number of items emitted

391

*/

392

public final Single<Long> count();

393

394

/**

395

* Emit source items and a notification when source completes

396

*/

397

public final Observable<T> doFinally(Action onFinally);

398

```

399

400

### Subscription and Consumption

401

402

Subscribe to an Observable and consume emitted items.

403

404

```java { .api }

405

/**

406

* Subscribes with separate callbacks for each event type

407

*/

408

public final Disposable subscribe();

409

public final Disposable subscribe(Consumer<? super T> onNext);

410

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);

411

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);

412

413

/**

414

* Subscribes with an Observer

415

*/

416

public final void subscribe(Observer<? super T> observer);

417

418

/**

419

* Blocking operations - use with caution

420

*/

421

public final T blockingFirst();

422

public final T blockingFirst(T defaultItem);

423

public final T blockingLast();

424

public final T blockingLast(T defaultItem);

425

public final T blockingSingle();

426

public final T blockingSingle(T defaultItem);

427

public final Iterable<T> blockingIterable();

428

public final void blockingSubscribe(Consumer<? super T> onNext);

429

```

430

431

## Usage Examples

432

433

**Basic Observable Creation and Subscription:**

434

435

```java

436

import io.reactivex.Observable;

437

import io.reactivex.disposables.Disposable;

438

439

// Simple Observable

440

Observable<String> source = Observable.just("Hello", "World");

441

442

Disposable disposable = source.subscribe(

443

item -> System.out.println("Item: " + item),

444

error -> System.err.println("Error: " + error),

445

() -> System.out.println("Complete")

446

);

447

448

// Remember to dispose when done

449

disposable.dispose();

450

```

451

452

**Transformation Chain:**

453

454

```java

455

Observable.fromArray(1, 2, 3, 4, 5)

456

.filter(x -> x % 2 == 0) // Keep even numbers

457

.map(x -> x * x) // Square them

458

.subscribe(result -> System.out.println("Result: " + result));

459

```

460

461

**Async Operations with Threading:**

462

463

```java

464

import io.reactivex.schedulers.Schedulers;

465

466

Observable<String> asyncSource = Observable.fromCallable(() -> {

467

// Simulate expensive operation

468

Thread.sleep(1000);

469

return "Async Result";

470

}).subscribeOn(Schedulers.io()) // Execute on IO thread

471

.observeOn(Schedulers.computation()); // Observe on computation thread

472

473

asyncSource.subscribe(

474

result -> System.out.println("Got: " + result),

475

error -> error.printStackTrace()

476

);

477

```

478

479

**Creating Custom Observable:**

480

481

```java

482

Observable<Integer> customObservable = Observable.create(emitter -> {

483

try {

484

for (int i = 1; i <= 5; i++) {

485

if (emitter.isDisposed()) {

486

return;

487

}

488

emitter.onNext(i);

489

}

490

emitter.onComplete();

491

} catch (Exception e) {

492

emitter.onError(e);

493

}

494

});

495

```

496

497

**Combining Multiple Observables:**

498

499

```java

500

Observable<String> obs1 = Observable.just("A", "B");

501

Observable<String> obs2 = Observable.just("1", "2");

502

503

Observable.zip(obs1, obs2, (s1, s2) -> s1 + s2)

504

.subscribe(combined -> System.out.println(combined)); // A1, B2

505

```

506

507

**Side Effects and Debugging:**

508

509

```java

510

Observable.fromArray(1, 2, 3, 4, 5)

511

.doOnNext(item -> System.out.println("Processing: " + item))

512

.filter(x -> x % 2 == 0)

513

.doOnNext(item -> System.out.println("After filter: " + item))

514

.map(x -> x * x)

515

.doOnComplete(() -> System.out.println("Stream completed"))

516

.subscribe(result -> System.out.println("Final result: " + result));

517

```

518

519

**Temporal Operations:**

520

521

```java

522

import java.util.concurrent.TimeUnit;

523

524

// Debounce - only emit if 300ms pass without another emission

525

Observable.interval(100, TimeUnit.MILLISECONDS)

526

.take(10)

527

.debounce(300, TimeUnit.MILLISECONDS)

528

.subscribe(item -> System.out.println("Debounced: " + item));

529

530

// Delay emissions by 1 second

531

Observable.just("Delayed", "Message")

532

.delay(1, TimeUnit.SECONDS)

533

.subscribe(item -> System.out.println("Got: " + item));

534

```

535

536

**Utility Operations:**

537

538

```java

539

// Timeout and fallback

540

Observable.timer(2, TimeUnit.SECONDS)

541

.timeout(1, TimeUnit.SECONDS, Observable.just("Fallback"))

542

.subscribe(

543

result -> System.out.println("Result: " + result),

544

error -> System.out.println("Timeout occurred")

545

);

546

547

// Cache for replay

548

Observable<String> cached = Observable.fromCallable(() -> {

549

System.out.println("Expensive operation executed");

550

return "Expensive Result";

551

}).cache();

552

553

// Multiple subscriptions will reuse cached result

554

cached.subscribe(result -> System.out.println("First: " + result));

555

cached.subscribe(result -> System.out.println("Second: " + result));

556

```