or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

observable.mddocs/

0

# Observable API

1

2

Observable is RxJava's implementation of the non-backpressured reactive stream pattern, designed for handling sequences of 0 to N items. It provides comprehensive operator support for transformation, filtering, combination, and error handling without built-in flow control.

3

4

## Capabilities

5

6

### Observable Creation

7

8

Factory methods for creating Observable instances from various sources.

9

10

```java { .api }

11

// Required imports:

12

// import java.util.concurrent.Callable;

13

// import java.util.concurrent.Future;

14

// import java.util.concurrent.TimeUnit;

15

// import java.util.function.Consumer;

16

// import java.util.function.BiConsumer;

17

// import java.util.function.Supplier;

18

// import org.reactivestreams.Publisher;

19

/**

20

* Creates an Observable that emits a single item

21

* @param item the item to emit

22

* @return Observable that emits the single item

23

*/

24

public static <T> Observable<T> just(T item);

25

26

/**

27

* Creates an Observable that emits two items

28

* @param item1 first item to emit

29

* @param item2 second item to emit

30

* @return Observable that emits the two items

31

*/

32

public static <T> Observable<T> just(T item1, T item2);

33

34

/**

35

* Creates an Observable that emits three items

36

* @param item1 first item to emit

37

* @param item2 second item to emit

38

* @param item3 third item to emit

39

* @return Observable that emits the three items

40

*/

41

public static <T> Observable<T> just(T item1, T item2, T item3);

42

43

/**

44

* Creates an Observable that emits up to ten items

45

* @param items the items to emit (up to 10 items supported)

46

* @return Observable that emits all provided items

47

*/

48

public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10);

49

50

/**

51

* Creates an Observable from an Iterable source

52

* @param source the Iterable to convert

53

* @return Observable that emits items from the Iterable

54

*/

55

public static <T> Observable<T> fromIterable(Iterable<? extends T> source);

56

57

/**

58

* Creates an Observable from an array

59

* @param array the array to convert

60

* @return Observable that emits items from the array

61

*/

62

public static <T> Observable<T> fromArray(T... array);

63

64

/**

65

* Creates an Observable using a custom emitter function

66

* @param source the ObservableOnSubscribe function

67

* @return Observable created from the custom emitter

68

*/

69

public static <T> Observable<T> create(ObservableOnSubscribe<T> source);

70

71

/**

72

* Creates an Observable that emits sequential integers

73

* @param start the starting value

74

* @param count the number of items to emit

75

* @return Observable emitting integers from start to start+count-1

76

*/

77

public static Observable<Integer> range(int start, int count);

78

79

/**

80

* Creates an Observable that emits at specified intervals

81

* @param period the emission interval

82

* @param unit the time unit

83

* @return Observable emitting sequential longs at intervals

84

*/

85

public static Observable<Long> interval(long period, TimeUnit unit);

86

87

/**

88

* Creates an Observable that emits after a delay

89

* @param delay the delay duration

90

* @param unit the time unit

91

* @return Observable that emits 0L after the delay

92

*/

93

public static Observable<Long> timer(long delay, TimeUnit unit);

94

95

/**

96

* Creates an empty Observable that only calls onComplete

97

* @return Observable that completes immediately

98

*/

99

public static <T> Observable<T> empty();

100

101

/**

102

* Creates an Observable that never emits anything

103

* @return Observable that never calls any observer methods

104

*/

105

public static <T> Observable<T> never();

106

107

/**

108

* Creates an Observable that only calls onError

109

* @param error the error to emit

110

* @return Observable that emits the error

111

*/

112

public static <T> Observable<T> error(Throwable error);

113

114

/**

115

* Creates an Observable from a Callable

116

* @param callable the Callable to invoke for each subscription

117

* @return Observable that emits the result of the Callable

118

*/

119

public static <T> Observable<T> fromCallable(Callable<? extends T> callable);

120

121

/**

122

* Creates an Observable from a CompletableSource

123

* @param completableSource the CompletableSource to convert

124

* @return Observable that completes when the CompletableSource completes

125

*/

126

public static <T> Observable<T> fromCompletable(CompletableSource completableSource);

127

128

/**

129

* Creates an Observable from a Future

130

* @param future the Future to convert

131

* @return Observable that emits the Future result

132

*/

133

public static <T> Observable<T> fromFuture(Future<? extends T> future);

134

135

/**

136

* Creates an Observable from a MaybeSource

137

* @param maybe the MaybeSource to convert

138

* @return Observable that emits the Maybe result or completes

139

*/

140

public static <T> Observable<T> fromMaybe(MaybeSource<T> maybe);

141

142

/**

143

* Creates an Observable from a Publisher (Reactive Streams)

144

* @param publisher the Publisher to convert

145

* @return Observable that emits items from the Publisher

146

*/

147

public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher);

148

149

/**

150

* Creates an Observable from a Runnable

151

* @param run the Runnable to execute

152

* @return Observable that completes after running the Runnable

153

*/

154

public static <T> Observable<T> fromRunnable(Runnable run);

155

156

/**

157

* Creates an Observable from a SingleSource

158

* @param source the SingleSource to convert

159

* @return Observable that emits the Single result

160

*/

161

public static <T> Observable<T> fromSingle(SingleSource<T> source);

162

163

/**

164

* Creates an Observable from a Supplier

165

* @param supplier the Supplier to invoke for each subscription

166

* @return Observable that emits the result of the Supplier

167

*/

168

public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier);

169

170

/**

171

* Creates an Observable that defers creation until subscription

172

* @param supplier function that returns an ObservableSource

173

* @return Observable that creates the actual source on subscription

174

*/

175

public static <T> Observable<T> defer(Supplier<? extends ObservableSource<? extends T>> supplier);

176

177

/**

178

* Creates an Observable that emits sequential long values in a range

179

* @param start the starting value

180

* @param count the number of items to emit

181

* @return Observable emitting longs from start to start+count-1

182

*/

183

public static Observable<Long> rangeLong(long start, long count);

184

185

/**

186

* Creates an Observable that emits at intervals starting after an initial delay

187

* @param initialDelay the initial delay before the first emission

188

* @param period the emission interval

189

* @param unit the time unit

190

* @return Observable emitting sequential longs at intervals

191

*/

192

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);

193

194

/**

195

* Creates an Observable that emits sequential longs over time in a range

196

* @param start the starting value

197

* @param count the number of items to emit

198

* @param initialDelay the initial delay before the first emission

199

* @param period the emission interval

200

* @param unit the time unit

201

* @return Observable emitting sequential longs in range over time

202

*/

203

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit);

204

205

/**

206

* Mirrors the first Observable to emit or complete from multiple sources

207

* @param sources the Iterable of ObservableSource instances

208

* @return Observable that mirrors the first source to emit

209

*/

210

public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);

211

212

/**

213

* Mirrors the first Observable to emit or complete from multiple sources

214

* @param sources the array of ObservableSource instances

215

* @return Observable that mirrors the first source to emit

216

*/

217

public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);

218

219

/**

220

* Generates values on demand using a generator function

221

* @param generator the generator function that emits values

222

* @return Observable that generates values on demand

223

*/

224

public static <T> Observable<T> generate(Consumer<Emitter<T>> generator);

225

226

/**

227

* Generates values on demand with state

228

* @param initialState the initial state supplier

229

* @param generator the generator function with state

230

* @return Observable that generates values with state

231

*/

232

public static <T, S> Observable<T> generate(Supplier<S> initialState, BiConsumer<S, Emitter<T>> generator);

233

234

/**

235

* Compares two Observable sequences for equality

236

* @param source1 the first Observable sequence

237

* @param source2 the second Observable sequence

238

* @return Single that emits true if sequences are equal

239

*/

240

public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);

241

242

/**

243

* Switches to new inner Observables as they arrive

244

* @param sources Observable of Observable sources

245

* @return Observable that switches to the latest inner Observable

246

*/

247

public static <T> Observable<T> switchOnNext(ObservableSource<? extends ObservableSource<? extends T>> sources);

248

249

/**

250

* Creates an Observable with resource management

251

* @param resourceSupplier supplier for the resource

252

* @param sourceSupplier function that creates the Observable from the resource

253

* @param resourceCleanup function to clean up the resource

254

* @return Observable with automatic resource management

255

*/

256

public static <T, D> Observable<T> using(

257

Supplier<? extends D> resourceSupplier,

258

Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,

259

Consumer<? super D> resourceCleanup

260

);

261

262

/**

263

* Wraps an ObservableSource to make it an Observable

264

* @param source the ObservableSource to wrap

265

* @return Observable wrapping the source

266

*/

267

public static <T> Observable<T> wrap(ObservableSource<T> source);

268

```

269

270

### Java 8+ Integration

271

272

Factory methods for integrating with Java 8+ features.

273

274

```java { .api }

275

// Required imports:

276

// import java.util.Optional;

277

// import java.util.concurrent.CompletionStage;

278

// import java.util.stream.Stream;

279

/**

280

* Creates an Observable from an Optional (Java 8+)

281

* @param optional the Optional to convert

282

* @return Observable that emits the Optional value or completes if empty

283

*/

284

public static <T> Observable<T> fromOptional(Optional<T> optional);

285

286

/**

287

* Creates an Observable from a CompletionStage (Java 8+)

288

* @param stage the CompletionStage to convert

289

* @return Observable that emits the CompletionStage result

290

*/

291

public static <T> Observable<T> fromCompletionStage(CompletionStage<T> stage);

292

293

/**

294

* Creates an Observable from a Stream (Java 8+)

295

* @param stream the Stream to convert

296

* @return Observable that emits the Stream elements

297

*/

298

public static <T> Observable<T> fromStream(Stream<T> stream);

299

```

300

301

### Transformation Operators

302

303

Transform emitted items using various mapping functions.

304

305

```java { .api }

306

/**

307

* Transform items using a mapping function

308

* @param mapper function to transform each item

309

* @return Observable with transformed items

310

*/

311

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);

312

313

/**

314

* Transform items to Observables and flatten the results

315

* @param mapper function returning Observable for each item

316

* @return Observable with flattened results

317

*/

318

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

319

320

/**

321

* Transform items to Observables and concatenate them in order

322

* @param mapper function returning Observable for each item

323

* @return Observable with concatenated results in order

324

*/

325

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

326

327

/**

328

* Transform items to Singles and merge the results

329

* @param mapper function returning Single for each item

330

* @return Observable with Single results

331

*/

332

public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper);

333

334

/**

335

* Emit only items that pass a predicate test

336

* @param predicate function to test each item

337

* @return Observable with filtered items

338

*/

339

public final Observable<T> filter(Predicate<? super T> predicate);

340

341

/**

342

* Emit only distinct items compared to previous emissions

343

* @return Observable with distinct consecutive items

344

*/

345

public final Observable<T> distinctUntilChanged();

346

347

/**

348

* Skip the first n items

349

* @param count number of items to skip

350

* @return Observable skipping the first count items

351

*/

352

public final Observable<T> skip(long count);

353

354

/**

355

* Take only the first n items

356

* @param count number of items to take

357

* @return Observable emitting only the first count items

358

*/

359

public final Observable<T> take(long count);

360

```

361

362

### Combination Operators

363

364

Combine multiple Observable sources.

365

366

```java { .api }

367

/**

368

* Merge multiple Observables into one

369

* @param sources array of Observable sources

370

* @return Observable merging all source emissions

371

*/

372

public static <T> Observable<T> merge(ObservableSource<? extends T>... sources);

373

374

/**

375

* Concatenate multiple Observables in sequence

376

* @param sources array of Observable sources

377

* @return Observable concatenating all sources in order

378

*/

379

public static <T> Observable<T> concat(ObservableSource<? extends T>... sources);

380

381

/**

382

* Combine the latest values from multiple sources

383

* @param source1 first Observable source

384

* @param source2 second Observable source

385

* @param combiner function to combine the latest values

386

* @return Observable emitting combined values

387

*/

388

public static <T1, T2, R> Observable<R> combineLatest(

389

ObservableSource<T1> source1,

390

ObservableSource<T2> source2,

391

BiFunction<? super T1, ? super T2, ? extends R> combiner

392

);

393

394

/**

395

* Zip multiple sources together

396

* @param source1 first Observable source

397

* @param source2 second Observable source

398

* @param zipper function to zip values together

399

* @return Observable emitting zipped values

400

*/

401

public static <T1, T2, R> Observable<R> zip(

402

ObservableSource<T1> source1,

403

ObservableSource<T2> source2,

404

BiFunction<? super T1, ? super T2, ? extends R> zipper

405

);

406

407

/**

408

* Start with additional items before the source emissions

409

* @param items items to emit first

410

* @return Observable starting with the specified items

411

*/

412

public final Observable<T> startWith(T... items);

413

414

/**

415

* Concatenate with another Observable

416

* @param other Observable to concatenate after this one

417

* @return Observable concatenating this and other

418

*/

419

public final Observable<T> concatWith(ObservableSource<? extends T> other);

420

```

421

422

### Subscription and Scheduling

423

424

Control when and how Observable emissions are observed.

425

426

```java { .api }

427

/**

428

* Subscribe with a simple onNext callback

429

* @param onNext function called for each emitted item

430

* @return Disposable for managing the subscription

431

*/

432

public final Disposable subscribe(Consumer<? super T> onNext);

433

434

/**

435

* Subscribe with onNext and onError callbacks

436

* @param onNext function called for each emitted item

437

* @param onError function called on error

438

* @return Disposable for managing the subscription

439

*/

440

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);

441

442

/**

443

* Subscribe with full Observer interface

444

* @param observer the Observer to receive emissions

445

*/

446

public final void subscribe(Observer<? super T> observer);

447

448

/**

449

* Specify the Scheduler for subscription (source) operations

450

* @param scheduler the Scheduler to use for subscriptions

451

* @return Observable operating on the specified scheduler

452

*/

453

public final Observable<T> subscribeOn(Scheduler scheduler);

454

455

/**

456

* Specify the Scheduler for observation (downstream) operations

457

* @param scheduler the Scheduler to use for observations

458

* @return Observable observing on the specified scheduler

459

*/

460

public final Observable<T> observeOn(Scheduler scheduler);

461

```

462

463

### Error Handling

464

465

Manage errors in the Observable stream.

466

467

```java { .api }

468

/**

469

* Return a default item when an error occurs

470

* @param defaultItem the item to emit on error

471

* @return Observable that emits defaultItem on error

472

*/

473

public final Observable<T> onErrorReturn(T defaultItem);

474

475

/**

476

* Resume with another Observable when an error occurs

477

* @param resumeSource Observable to switch to on error

478

* @return Observable that switches to resumeSource on error

479

*/

480

public final Observable<T> onErrorResumeNext(ObservableSource<? extends T> resumeSource);

481

482

/**

483

* Retry the subscription when an error occurs

484

* @param times maximum number of retry attempts

485

* @return Observable that retries up to the specified times

486

*/

487

public final Observable<T> retry(long times);

488

489

/**

490

* Perform side-effect action when an error occurs

491

* @param onError action to perform on error

492

* @return Observable that performs the action on error

493

*/

494

public final Observable<T> doOnError(Consumer<? super Throwable> onError);

495

```

496

497

### Side Effects

498

499

Perform side-effect actions without modifying the stream.

500

501

```java { .api }

502

/**

503

* Perform an action for each emitted item

504

* @param onNext action to perform for each item

505

* @return Observable that performs the action for each item

506

*/

507

public final Observable<T> doOnNext(Consumer<? super T> onNext);

508

509

/**

510

* Perform an action when the Observable completes

511

* @param onComplete action to perform on completion

512

* @return Observable that performs the action on completion

513

*/

514

public final Observable<T> doOnComplete(Action onComplete);

515

516

/**

517

* Perform an action when subscription occurs

518

* @param onSubscribe action to perform on subscription

519

* @return Observable that performs the action on subscription

520

*/

521

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);

522

523

/**

524

* Perform an action when disposal occurs

525

* @param onDispose action to perform on disposal

526

* @return Observable that performs the action on disposal

527

*/

528

public final Observable<T> doOnDispose(Action onDispose);

529

```

530

531

## Types

532

533

```java { .api }

534

/**

535

* Interface for creating custom Observable sources

536

*/

537

public interface ObservableOnSubscribe<T> {

538

void subscribe(ObservableEmitter<T> emitter) throws Throwable;

539

}

540

541

/**

542

* Emitter interface for custom Observable creation

543

*/

544

public interface ObservableEmitter<T> extends Emitter<T> {

545

void onNext(T value);

546

void onError(Throwable error);

547

void onComplete();

548

boolean isDisposed();

549

}

550

551

/**

552

* Base interface for Observable sources

553

*/

554

public interface ObservableSource<T> {

555

void subscribe(Observer<? super T> observer);

556

}

557

```

558

559

**Usage Examples:**

560

561

```java

562

import io.reactivex.rxjava3.core.Observable;

563

import io.reactivex.rxjava3.schedulers.Schedulers;

564

import java.util.concurrent.TimeUnit;

565

566

// Basic Observable creation and subscription

567

Observable.just("Hello", "World")

568

.subscribe(System.out::println);

569

570

// Complex transformation chain

571

Observable.range(1, 10)

572

.filter(x -> x % 2 == 0)

573

.map(x -> x * x)

574

.take(3)

575

.subscribe(System.out::println);

576

577

// Async operations with scheduling

578

Observable.fromCallable(() -> {

579

Thread.sleep(1000);

580

return "Computed result";

581

})

582

.subscribeOn(Schedulers.io())

583

.observeOn(Schedulers.single())

584

.subscribe(

585

result -> System.out.println("Result: " + result),

586

error -> System.err.println("Error: " + error)

587

);

588

589

// Error handling

590

Observable.just(1, 2, 0, 4)

591

.map(x -> 10 / x)

592

.onErrorReturn(-1)

593

.subscribe(System.out::println);

594

```