Shared RxJS Operators for NgRx libraries providing concatLatestFrom, mapResponse, and tapResponse utilities
npx @tessl/cli install tessl/npm-ngrx--operators@20.0.0@ngrx/operators provides a collection of specialized RxJS operators designed for NgRx libraries and Angular applications. It offers three essential operators for reactive programming: concatLatestFrom for combining latest values, mapResponse for transforming responses with error handling, and tapResponse for safe side-effect handling in ComponentStore effects.
npm install @ngrx/operatorsimport { concatLatestFrom, mapResponse, tapResponse } from "@ngrx/operators";For CommonJS:
const { concatLatestFrom, mapResponse, tapResponse } = require("@ngrx/operators");import { concatLatestFrom, mapResponse, tapResponse } from "@ngrx/operators";
import { of, Observable } from "rxjs";
// concatLatestFrom - combine source with latest from other observables
const source$ = of("action");
const store$ = of({ user: "Alice" });
source$.pipe(
concatLatestFrom(() => store$)
).subscribe(([action, storeValue]) => {
console.log(action, storeValue); // "action", { user: "Alice" }
});
// mapResponse - transform responses with error handling
const api$ = of({ users: ["Alice", "Bob"] });
api$.pipe(
mapResponse({
next: (data) => ({ type: "SUCCESS", payload: data.users }),
error: (error) => ({ type: "ERROR", error })
})
).subscribe(action => console.log(action));
// tapResponse - handle responses with side effects
const service$ = of({ data: "result" });
service$.pipe(
tapResponse({
next: (data) => console.log("Success:", data),
error: (error) => console.error("Error:", error),
finalize: () => console.log("Request completed")
})
).subscribe();@ngrx/operators is built with three key design principles:
Combines source values with the latest values from lazily evaluated observables, maintaining proper type safety and tuple ordering.
/**
* Combines the source value and the last available value from a lazily evaluated Observable in a new array
*/
function concatLatestFrom<T extends Observable<unknown>[], V>(
observablesFactory: (value: V) => [...T]
): OperatorFunction<V, [V, ...{ [i in keyof T]: ObservedValueOf<T[i]> }]>;
function concatLatestFrom<T extends Observable<unknown>, V>(
observableFactory: (value: V) => T
): OperatorFunction<V, [V, ObservedValueOf<T>]>;
function concatLatestFrom<
T extends ObservableInput<unknown>[] | ObservableInput<unknown>,
V,
R = [
V,
...(T extends ObservableInput<unknown>[]
? { [i in keyof T]: ObservedValueOf<T[i]> }
: [ObservedValueOf<T>])
]
>(observablesFactory: (value: V) => T): OperatorFunction<V, R>;Usage Examples:
import { concatLatestFrom } from "@ngrx/operators";
import { Actions, ofType } from "@ngrx/effects";
import { Store } from "@ngrx/store";
// In NgRx Effects - select active customer
this.actions$.pipe(
ofType(CustomerActions.load),
concatLatestFrom(() => this.store.select(selectActiveCustomer))
).subscribe(([action, customer]) => {
console.log(action, customer);
});
// Select based on action data
this.actions$.pipe(
ofType(CustomerActions.loadById),
concatLatestFrom((action) => this.store.select(selectCustomer(action.id)))
).subscribe(([action, customer]) => {
console.log(action, customer);
});
// Multiple observables
this.actions$.pipe(
concatLatestFrom(() => [
this.store.select(selectUser),
this.store.select(selectSettings)
])
).subscribe(([action, user, settings]) => {
console.log(action, user, settings);
});Maps both success and error responses, designed specifically for NgRx Effects where actions must be dispatched.
/**
* A map operator with included error handling, similar to tapResponse but allows mapping the response
*/
function mapResponse<T, E, R1, R2>(
observer: MapResponseObserver<T, E, R1, R2>
): (source$: Observable<T>) => Observable<R1 | R2>;
interface MapResponseObserver<T, E, R1, R2> {
/** Transform successful responses */
next: (value: T) => R1;
/** Transform error responses */
error: (error: E) => R2;
}Usage Examples:
import { mapResponse } from "@ngrx/operators";
import { createEffect, Actions, ofType } from "@ngrx/effects";
import { exhaustMap } from "rxjs/operators";
// NgRx Effect with mapResponse
export const loadAllUsers = createEffect((
actions$ = inject(Actions),
usersService = inject(UsersService)
) => {
return actions$.pipe(
ofType(UsersPageActions.opened),
exhaustMap(() => {
return usersService.getAll().pipe(
mapResponse({
next: (users) => UsersApiActions.usersLoadedSuccess({ users }),
error: (error) => UsersApiActions.usersLoadedFailure({ error }),
})
);
})
);
});
// Transform API responses
const apiCall$ = this.http.get<User[]>('/api/users');
apiCall$.pipe(
mapResponse({
next: (users) => ({ success: true, data: users, timestamp: Date.now() }),
error: (error) => ({ success: false, error: error.message, timestamp: Date.now() })
})
).subscribe(result => {
console.log(result); // Always gets a transformed result
});Handles responses with side effects while keeping the original stream intact. Designed for ComponentStore effects with safe error handling.
/**
* Handles the response in ComponentStore effects in a safe way, without additional boilerplate
*/
function tapResponse<T, E = unknown>(
observer: TapResponseObserver<T, E>
): (source$: Observable<T>) => Observable<T>;
/**
* @deprecated Instead of passing a sequence of callbacks, use an observer object
*/
function tapResponse<T, E = unknown>(
next: (value: T) => void,
error: (error: E) => void,
complete?: () => void
): (source$: Observable<T>) => Observable<T>;
interface TapResponseObserver<T, E> {
/** Handle successful responses */
next: (value: T) => void;
/** Handle error responses */
error: (error: E) => void;
/** Handle stream completion (optional) */
complete?: () => void;
/** Always called after next/error/complete (optional) */
finalize?: () => void;
}Usage Examples:
import { tapResponse } from "@ngrx/operators";
import { rxMethod } from "@ngrx/signals/rxjs-interop";
import { exhaustMap, tap } from "rxjs/operators";
// ComponentStore rxMethod
readonly loadUsers = rxMethod<void>(
pipe(
tap(() => this.isLoading.set(true)),
exhaustMap(() =>
this.usersService.getAll().pipe(
tapResponse({
next: (users) => this.users.set(users),
error: (error: HttpErrorResponse) => this.logError(error.message),
finalize: () => this.isLoading.set(false),
})
)
)
)
);
// Error handling in streams
const dataStream$ = this.service.getData();
dataStream$.pipe(
tapResponse({
next: (data) => {
console.log("Received data:", data);
this.cache.set(data);
},
error: (error) => {
console.error("Failed to load data:", error);
this.showErrorMessage(error.message);
},
complete: () => console.log("Data loading completed"),
finalize: () => this.cleanup()
})
).subscribe(); // Stream continues even if errors occur
// Deprecated callback style (still supported)
dataStream$.pipe(
tapResponse(
(data) => console.log("Success:", data),
(error) => console.error("Error:", error),
() => console.log("Complete")
)
).subscribe();// RxJS type imports
type Observable<T> = import('rxjs').Observable<T>;
type ObservableInput<T> = import('rxjs').ObservableInput<T>;
type OperatorFunction<T, R> = import('rxjs').OperatorFunction<T, R>;
type ObservedValueOf<T> = import('rxjs').ObservedValueOf<T>;
// mapResponse types
interface MapResponseObserver<T, E, R1, R2> {
next: (value: T) => R1;
error: (error: E) => R2;
}
// tapResponse types
interface TapResponseObserver<T, E> {
next: (value: T) => void;
error: (error: E) => void;
complete?: () => void;
finalize?: () => void;
}