CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

subjects.mddocs/

Subjects

Special observables that can act as both observer and observable, enabling multicasting patterns where multiple subscribers receive the same values.

Capabilities

Subject

Basic subject that multicasts values to multiple subscribers.

/**
 * Subject acts as both Observable and Observer, enabling multicasting
 */
class Subject<T> extends Observable<T> implements Observer<T> {
  /**
   * Whether the subject has been closed/completed
   */
  readonly closed: boolean;

  /**
   * Whether the subject has observers
   */
  readonly hasError: boolean;

  /**
   * Whether the subject is currently being observed
   */
  readonly isStopped: boolean;

  /**
   * Current observers count
   */
  readonly observers: Observer<T>[];

  /**
   * Emit a value to all subscribers
   * @param value - Value to emit
   */
  next(value: T): void;

  /**
   * Emit an error to all subscribers and complete the subject
   * @param err - Error to emit
   */
  error(err: any): void;

  /**
   * Complete the subject, notifying all subscribers
   */
  complete(): void;

  /**
   * Unsubscribe all observers and clean up resources
   */
  unsubscribe(): void;

  /**
   * Create observable that shares this subject's notifications
   * @returns Observable instance
   */
  asObservable(): Observable<T>;
}

Usage Examples:

import { Subject } from "rxjs";

const subject = new Subject<string>();

// Multiple subscribers
subject.subscribe(value => console.log('Observer A:', value));
subject.subscribe(value => console.log('Observer B:', value));

// Emit values - both observers receive them
subject.next('Hello');
subject.next('World');
subject.complete();

// New subscriber after completion receives nothing
subject.subscribe(value => console.log('Observer C:', value)); // No output

BehaviorSubject

Subject that stores the current value and emits it to new subscribers immediately.

/**
 * Subject that holds a current value and emits it immediately to new subscribers
 */
class BehaviorSubject<T> extends Subject<T> {
  /**
   * Create BehaviorSubject with initial value
   * @param initialValue - Initial value to store and emit
   */
  constructor(initialValue: T);

  /**
   * Current value held by the subject
   */
  readonly value: T;

  /**
   * Get current value (synchronous)
   * @returns Current stored value
   */
  getValue(): T;
}

Usage Examples:

import { BehaviorSubject } from "rxjs";

// Create with initial value
const behaviorSubject = new BehaviorSubject<number>(0);

// New subscriber immediately gets current value (0)
behaviorSubject.subscribe(val => console.log('Subscriber A:', val));

// Emit new values
behaviorSubject.next(1);
behaviorSubject.next(2);

// New subscriber gets current value (2) immediately
behaviorSubject.subscribe(val => console.log('Subscriber B:', val));

// Access current value synchronously
console.log('Current value:', behaviorSubject.value); // 2
console.log('Current value (method):', behaviorSubject.getValue()); // 2

ReplaySubject

Subject that stores recent values and replays them to new subscribers.

/**
 * Subject that replays recent values to new subscribers
 */
class ReplaySubject<T> extends Subject<T> {
  /**
   * Create ReplaySubject with buffer configuration
   * @param bufferSize - Number of values to buffer (default: Infinity)
   * @param windowTime - Time in ms to keep values (default: Infinity) 
   * @param timestampProvider - Custom timestamp provider
   */
  constructor(
    bufferSize?: number, 
    windowTime?: number, 
    timestampProvider?: TimestampProvider
  );
}

interface TimestampProvider {
  now(): number;
}

Usage Examples:

import { ReplaySubject } from "rxjs";

// Replay last 3 values
const replaySubject = new ReplaySubject<number>(3);

// Emit some values
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.next(4);

// New subscriber gets last 3 values (2, 3, 4)
replaySubject.subscribe(val => console.log('Subscriber A:', val));

// Time-based replay (last 1 second)
const timeReplay = new ReplaySubject<string>(Infinity, 1000);
timeReplay.next('old');
setTimeout(() => {
  timeReplay.next('recent');
  // New subscriber only gets 'recent' if subscribed after 1 second
  timeReplay.subscribe(val => console.log('Time subscriber:', val));
}, 1100);

AsyncSubject

Subject that only emits the last value when completed.

/**
 * Subject that only emits the last value when the sequence completes
 */
class AsyncSubject<T> extends Subject<T> {
  /**
   * Create AsyncSubject
   */
  constructor();
}

Usage Examples:

import { AsyncSubject } from "rxjs";

const asyncSubject = new AsyncSubject<number>();

// Subscribers don't receive values until completion
asyncSubject.subscribe(val => console.log('Subscriber A:', val));

asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3); // Only this value will be emitted

asyncSubject.subscribe(val => console.log('Subscriber B:', val));

// Complete to emit the last value (3) to all subscribers
asyncSubject.complete();

// New subscriber after completion gets the last value
asyncSubject.subscribe(val => console.log('Subscriber C:', val)); // 3

Advanced Patterns

Subject as Event Bus

import { Subject, filter } from "rxjs";

interface AppEvent {
  type: string;
  payload: any;
}

const eventBus = new Subject<AppEvent>();

// Subscribe to specific event types
eventBus.pipe(
  filter(event => event.type === 'user-login')
).subscribe(event => {
  console.log('User logged in:', event.payload);
});

eventBus.pipe(
  filter(event => event.type === 'user-logout')
).subscribe(event => {
  console.log('User logged out');
});

// Emit events
eventBus.next({ type: 'user-login', payload: { userId: 123 } });
eventBus.next({ type: 'user-logout', payload: null });

State Management with BehaviorSubject

import { BehaviorSubject, map } from "rxjs";

interface AppState {
  user: { id: number; name: string } | null;
  loading: boolean;
}

class StateService {
  private state$ = new BehaviorSubject<AppState>({
    user: null,
    loading: false
  });

  // Expose read-only state
  readonly state = this.state$.asObservable();

  // Specific selectors
  readonly user$ = this.state$.pipe(map(state => state.user));
  readonly loading$ = this.state$.pipe(map(state => state.loading));

  setUser(user: { id: number; name: string }) {
    this.state$.next({
      ...this.state$.value,
      user
    });
  }

  setLoading(loading: boolean) {
    this.state$.next({
      ...this.state$.value,
      loading
    });
  }
}

Types

interface Observer<T> {
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

interface SubjectLike<T> extends Observer<T> {
  asObservable(): Observable<T>;
}

interface TimestampProvider {
  now(): number;
}

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json