or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-ngrx--operators

Shared RxJS Operators for NgRx libraries providing concatLatestFrom, mapResponse, and tapResponse utilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@ngrx/operators@20.0.x

To install, run

npx @tessl/cli install tessl/npm-ngrx--operators@20.0.0

index.mddocs/

@ngrx/operators

@ngrx/operators provides a collection of specialized RxJS operators designed for NgRx libraries and Angular applications. It offers three essential operators for reactive programming: concatLatestFrom for combining latest values, mapResponse for transforming responses with error handling, and tapResponse for safe side-effect handling in ComponentStore effects.

Package Information

  • Package Name: @ngrx/operators
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @ngrx/operators

Core Imports

import { concatLatestFrom, mapResponse, tapResponse } from "@ngrx/operators";

For CommonJS:

const { concatLatestFrom, mapResponse, tapResponse } = require("@ngrx/operators");

Basic Usage

import { concatLatestFrom, mapResponse, tapResponse } from "@ngrx/operators";
import { of, Observable } from "rxjs";

// concatLatestFrom - combine source with latest from other observables
const source$ = of("action");
const store$ = of({ user: "Alice" });

source$.pipe(
  concatLatestFrom(() => store$)
).subscribe(([action, storeValue]) => {
  console.log(action, storeValue); // "action", { user: "Alice" }
});

// mapResponse - transform responses with error handling
const api$ = of({ users: ["Alice", "Bob"] });

api$.pipe(
  mapResponse({
    next: (data) => ({ type: "SUCCESS", payload: data.users }),
    error: (error) => ({ type: "ERROR", error })
  })
).subscribe(action => console.log(action));

// tapResponse - handle responses with side effects
const service$ = of({ data: "result" });

service$.pipe(
  tapResponse({
    next: (data) => console.log("Success:", data),
    error: (error) => console.error("Error:", error),
    finalize: () => console.log("Request completed")
  })
).subscribe();

Architecture

@ngrx/operators is built with three key design principles:

  • Type Safety: Full TypeScript integration with proper type inference and generic support
  • Error Handling: Built-in error handling patterns that prevent stream termination
  • NgRx Integration: Operators designed specifically for NgRx patterns like Effects and ComponentStore

Capabilities

Combine Latest Values

Combines source values with the latest values from lazily evaluated observables, maintaining proper type safety and tuple ordering.

/**
 * Combines the source value and the last available value from a lazily evaluated Observable in a new array
 */
function concatLatestFrom<T extends Observable<unknown>[], V>(
  observablesFactory: (value: V) => [...T]
): OperatorFunction<V, [V, ...{ [i in keyof T]: ObservedValueOf<T[i]> }]>;

function concatLatestFrom<T extends Observable<unknown>, V>(
  observableFactory: (value: V) => T
): OperatorFunction<V, [V, ObservedValueOf<T>]>;

function concatLatestFrom<
  T extends ObservableInput<unknown>[] | ObservableInput<unknown>,
  V,
  R = [
    V,
    ...(T extends ObservableInput<unknown>[]
      ? { [i in keyof T]: ObservedValueOf<T[i]> }
      : [ObservedValueOf<T>])
  ]
>(observablesFactory: (value: V) => T): OperatorFunction<V, R>;

Usage Examples:

import { concatLatestFrom } from "@ngrx/operators";
import { Actions, ofType } from "@ngrx/effects";
import { Store } from "@ngrx/store";

// In NgRx Effects - select active customer
this.actions$.pipe(
  ofType(CustomerActions.load),
  concatLatestFrom(() => this.store.select(selectActiveCustomer))
).subscribe(([action, customer]) => {
  console.log(action, customer);
});

// Select based on action data
this.actions$.pipe(
  ofType(CustomerActions.loadById),
  concatLatestFrom((action) => this.store.select(selectCustomer(action.id)))
).subscribe(([action, customer]) => {
  console.log(action, customer);
});

// Multiple observables
this.actions$.pipe(
  concatLatestFrom(() => [
    this.store.select(selectUser),
    this.store.select(selectSettings)
  ])
).subscribe(([action, user, settings]) => {
  console.log(action, user, settings);
});

Response Mapping

Maps both success and error responses, designed specifically for NgRx Effects where actions must be dispatched.

/**
 * A map operator with included error handling, similar to tapResponse but allows mapping the response
 */
function mapResponse<T, E, R1, R2>(
  observer: MapResponseObserver<T, E, R1, R2>
): (source$: Observable<T>) => Observable<R1 | R2>;

interface MapResponseObserver<T, E, R1, R2> {
  /** Transform successful responses */
  next: (value: T) => R1;
  /** Transform error responses */
  error: (error: E) => R2;
}

Usage Examples:

import { mapResponse } from "@ngrx/operators";
import { createEffect, Actions, ofType } from "@ngrx/effects";
import { exhaustMap } from "rxjs/operators";

// NgRx Effect with mapResponse
export const loadAllUsers = createEffect((
  actions$ = inject(Actions),
  usersService = inject(UsersService)
) => {
  return actions$.pipe(
    ofType(UsersPageActions.opened),
    exhaustMap(() => {
      return usersService.getAll().pipe(
        mapResponse({
          next: (users) => UsersApiActions.usersLoadedSuccess({ users }),
          error: (error) => UsersApiActions.usersLoadedFailure({ error }),
        })
      );
    })
  );
});

// Transform API responses
const apiCall$ = this.http.get<User[]>('/api/users');

apiCall$.pipe(
  mapResponse({
    next: (users) => ({ success: true, data: users, timestamp: Date.now() }),
    error: (error) => ({ success: false, error: error.message, timestamp: Date.now() })
  })
).subscribe(result => {
  console.log(result); // Always gets a transformed result
});

Response Tapping

Handles responses with side effects while keeping the original stream intact. Designed for ComponentStore effects with safe error handling.

/**
 * Handles the response in ComponentStore effects in a safe way, without additional boilerplate
 */
function tapResponse<T, E = unknown>(
  observer: TapResponseObserver<T, E>
): (source$: Observable<T>) => Observable<T>;

/**
 * @deprecated Instead of passing a sequence of callbacks, use an observer object
 */
function tapResponse<T, E = unknown>(
  next: (value: T) => void,
  error: (error: E) => void,
  complete?: () => void
): (source$: Observable<T>) => Observable<T>;

interface TapResponseObserver<T, E> {
  /** Handle successful responses */
  next: (value: T) => void;
  /** Handle error responses */
  error: (error: E) => void;
  /** Handle stream completion (optional) */
  complete?: () => void;
  /** Always called after next/error/complete (optional) */
  finalize?: () => void;
}

Usage Examples:

import { tapResponse } from "@ngrx/operators";
import { rxMethod } from "@ngrx/signals/rxjs-interop";
import { exhaustMap, tap } from "rxjs/operators";

// ComponentStore rxMethod
readonly loadUsers = rxMethod<void>(
  pipe(
    tap(() => this.isLoading.set(true)),
    exhaustMap(() =>
      this.usersService.getAll().pipe(
        tapResponse({
          next: (users) => this.users.set(users),
          error: (error: HttpErrorResponse) => this.logError(error.message),
          finalize: () => this.isLoading.set(false),
        })
      )
    )
  )
);

// Error handling in streams
const dataStream$ = this.service.getData();

dataStream$.pipe(
  tapResponse({
    next: (data) => {
      console.log("Received data:", data);
      this.cache.set(data);
    },
    error: (error) => {
      console.error("Failed to load data:", error);
      this.showErrorMessage(error.message);
    },
    complete: () => console.log("Data loading completed"),
    finalize: () => this.cleanup()
  })
).subscribe(); // Stream continues even if errors occur

// Deprecated callback style (still supported)
dataStream$.pipe(
  tapResponse(
    (data) => console.log("Success:", data),
    (error) => console.error("Error:", error),
    () => console.log("Complete")
  )
).subscribe();

Types

// RxJS type imports
type Observable<T> = import('rxjs').Observable<T>;
type ObservableInput<T> = import('rxjs').ObservableInput<T>;
type OperatorFunction<T, R> = import('rxjs').OperatorFunction<T, R>;
type ObservedValueOf<T> = import('rxjs').ObservedValueOf<T>;

// mapResponse types
interface MapResponseObserver<T, E, R1, R2> {
  next: (value: T) => R1;
  error: (error: E) => R2;
}

// tapResponse types
interface TapResponseObserver<T, E> {
  next: (value: T) => void;
  error: (error: E) => void;
  complete?: () => void;
  finalize?: () => void;
}