Shared RxJS Operators for NgRx libraries providing concatLatestFrom, mapResponse, and tapResponse utilities
npx @tessl/cli install tessl/npm-ngrx--operators@20.0.00
# @ngrx/operators
1
2
@ngrx/operators provides a collection of specialized RxJS operators designed for NgRx libraries and Angular applications. It offers three essential operators for reactive programming: concatLatestFrom for combining latest values, mapResponse for transforming responses with error handling, and tapResponse for safe side-effect handling in ComponentStore effects.
3
4
## Package Information
5
6
- **Package Name**: @ngrx/operators
7
- **Package Type**: npm
8
- **Language**: TypeScript
9
- **Installation**: `npm install @ngrx/operators`
10
11
## Core Imports
12
13
```typescript
14
import { concatLatestFrom, mapResponse, tapResponse } from "@ngrx/operators";
15
```
16
17
For CommonJS:
18
19
```javascript
20
const { concatLatestFrom, mapResponse, tapResponse } = require("@ngrx/operators");
21
```
22
23
## Basic Usage
24
25
```typescript
26
import { concatLatestFrom, mapResponse, tapResponse } from "@ngrx/operators";
27
import { of, Observable } from "rxjs";
28
29
// concatLatestFrom - combine source with latest from other observables
30
const source$ = of("action");
31
const store$ = of({ user: "Alice" });
32
33
source$.pipe(
34
concatLatestFrom(() => store$)
35
).subscribe(([action, storeValue]) => {
36
console.log(action, storeValue); // "action", { user: "Alice" }
37
});
38
39
// mapResponse - transform responses with error handling
40
const api$ = of({ users: ["Alice", "Bob"] });
41
42
api$.pipe(
43
mapResponse({
44
next: (data) => ({ type: "SUCCESS", payload: data.users }),
45
error: (error) => ({ type: "ERROR", error })
46
})
47
).subscribe(action => console.log(action));
48
49
// tapResponse - handle responses with side effects
50
const service$ = of({ data: "result" });
51
52
service$.pipe(
53
tapResponse({
54
next: (data) => console.log("Success:", data),
55
error: (error) => console.error("Error:", error),
56
finalize: () => console.log("Request completed")
57
})
58
).subscribe();
59
```
60
61
## Architecture
62
63
@ngrx/operators is built with three key design principles:
64
65
- **Type Safety**: Full TypeScript integration with proper type inference and generic support
66
- **Error Handling**: Built-in error handling patterns that prevent stream termination
67
- **NgRx Integration**: Operators designed specifically for NgRx patterns like Effects and ComponentStore
68
69
## Capabilities
70
71
### Combine Latest Values
72
73
Combines source values with the latest values from lazily evaluated observables, maintaining proper type safety and tuple ordering.
74
75
```typescript { .api }
76
/**
77
* Combines the source value and the last available value from a lazily evaluated Observable in a new array
78
*/
79
function concatLatestFrom<T extends Observable<unknown>[], V>(
80
observablesFactory: (value: V) => [...T]
81
): OperatorFunction<V, [V, ...{ [i in keyof T]: ObservedValueOf<T[i]> }]>;
82
83
function concatLatestFrom<T extends Observable<unknown>, V>(
84
observableFactory: (value: V) => T
85
): OperatorFunction<V, [V, ObservedValueOf<T>]>;
86
87
function concatLatestFrom<
88
T extends ObservableInput<unknown>[] | ObservableInput<unknown>,
89
V,
90
R = [
91
V,
92
...(T extends ObservableInput<unknown>[]
93
? { [i in keyof T]: ObservedValueOf<T[i]> }
94
: [ObservedValueOf<T>])
95
]
96
>(observablesFactory: (value: V) => T): OperatorFunction<V, R>;
97
```
98
99
**Usage Examples:**
100
101
```typescript
102
import { concatLatestFrom } from "@ngrx/operators";
103
import { Actions, ofType } from "@ngrx/effects";
104
import { Store } from "@ngrx/store";
105
106
// In NgRx Effects - select active customer
107
this.actions$.pipe(
108
ofType(CustomerActions.load),
109
concatLatestFrom(() => this.store.select(selectActiveCustomer))
110
).subscribe(([action, customer]) => {
111
console.log(action, customer);
112
});
113
114
// Select based on action data
115
this.actions$.pipe(
116
ofType(CustomerActions.loadById),
117
concatLatestFrom((action) => this.store.select(selectCustomer(action.id)))
118
).subscribe(([action, customer]) => {
119
console.log(action, customer);
120
});
121
122
// Multiple observables
123
this.actions$.pipe(
124
concatLatestFrom(() => [
125
this.store.select(selectUser),
126
this.store.select(selectSettings)
127
])
128
).subscribe(([action, user, settings]) => {
129
console.log(action, user, settings);
130
});
131
```
132
133
### Response Mapping
134
135
Maps both success and error responses, designed specifically for NgRx Effects where actions must be dispatched.
136
137
```typescript { .api }
138
/**
139
* A map operator with included error handling, similar to tapResponse but allows mapping the response
140
*/
141
function mapResponse<T, E, R1, R2>(
142
observer: MapResponseObserver<T, E, R1, R2>
143
): (source$: Observable<T>) => Observable<R1 | R2>;
144
145
interface MapResponseObserver<T, E, R1, R2> {
146
/** Transform successful responses */
147
next: (value: T) => R1;
148
/** Transform error responses */
149
error: (error: E) => R2;
150
}
151
```
152
153
**Usage Examples:**
154
155
```typescript
156
import { mapResponse } from "@ngrx/operators";
157
import { createEffect, Actions, ofType } from "@ngrx/effects";
158
import { exhaustMap } from "rxjs/operators";
159
160
// NgRx Effect with mapResponse
161
export const loadAllUsers = createEffect((
162
actions$ = inject(Actions),
163
usersService = inject(UsersService)
164
) => {
165
return actions$.pipe(
166
ofType(UsersPageActions.opened),
167
exhaustMap(() => {
168
return usersService.getAll().pipe(
169
mapResponse({
170
next: (users) => UsersApiActions.usersLoadedSuccess({ users }),
171
error: (error) => UsersApiActions.usersLoadedFailure({ error }),
172
})
173
);
174
})
175
);
176
});
177
178
// Transform API responses
179
const apiCall$ = this.http.get<User[]>('/api/users');
180
181
apiCall$.pipe(
182
mapResponse({
183
next: (users) => ({ success: true, data: users, timestamp: Date.now() }),
184
error: (error) => ({ success: false, error: error.message, timestamp: Date.now() })
185
})
186
).subscribe(result => {
187
console.log(result); // Always gets a transformed result
188
});
189
```
190
191
### Response Tapping
192
193
Handles responses with side effects while keeping the original stream intact. Designed for ComponentStore effects with safe error handling.
194
195
```typescript { .api }
196
/**
197
* Handles the response in ComponentStore effects in a safe way, without additional boilerplate
198
*/
199
function tapResponse<T, E = unknown>(
200
observer: TapResponseObserver<T, E>
201
): (source$: Observable<T>) => Observable<T>;
202
203
/**
204
* @deprecated Instead of passing a sequence of callbacks, use an observer object
205
*/
206
function tapResponse<T, E = unknown>(
207
next: (value: T) => void,
208
error: (error: E) => void,
209
complete?: () => void
210
): (source$: Observable<T>) => Observable<T>;
211
212
interface TapResponseObserver<T, E> {
213
/** Handle successful responses */
214
next: (value: T) => void;
215
/** Handle error responses */
216
error: (error: E) => void;
217
/** Handle stream completion (optional) */
218
complete?: () => void;
219
/** Always called after next/error/complete (optional) */
220
finalize?: () => void;
221
}
222
```
223
224
**Usage Examples:**
225
226
```typescript
227
import { tapResponse } from "@ngrx/operators";
228
import { rxMethod } from "@ngrx/signals/rxjs-interop";
229
import { exhaustMap, tap } from "rxjs/operators";
230
231
// ComponentStore rxMethod
232
readonly loadUsers = rxMethod<void>(
233
pipe(
234
tap(() => this.isLoading.set(true)),
235
exhaustMap(() =>
236
this.usersService.getAll().pipe(
237
tapResponse({
238
next: (users) => this.users.set(users),
239
error: (error: HttpErrorResponse) => this.logError(error.message),
240
finalize: () => this.isLoading.set(false),
241
})
242
)
243
)
244
)
245
);
246
247
// Error handling in streams
248
const dataStream$ = this.service.getData();
249
250
dataStream$.pipe(
251
tapResponse({
252
next: (data) => {
253
console.log("Received data:", data);
254
this.cache.set(data);
255
},
256
error: (error) => {
257
console.error("Failed to load data:", error);
258
this.showErrorMessage(error.message);
259
},
260
complete: () => console.log("Data loading completed"),
261
finalize: () => this.cleanup()
262
})
263
).subscribe(); // Stream continues even if errors occur
264
265
// Deprecated callback style (still supported)
266
dataStream$.pipe(
267
tapResponse(
268
(data) => console.log("Success:", data),
269
(error) => console.error("Error:", error),
270
() => console.log("Complete")
271
)
272
).subscribe();
273
```
274
275
## Types
276
277
```typescript { .api }
278
// RxJS type imports
279
type Observable<T> = import('rxjs').Observable<T>;
280
type ObservableInput<T> = import('rxjs').ObservableInput<T>;
281
type OperatorFunction<T, R> = import('rxjs').OperatorFunction<T, R>;
282
type ObservedValueOf<T> = import('rxjs').ObservedValueOf<T>;
283
284
// mapResponse types
285
interface MapResponseObserver<T, E, R1, R2> {
286
next: (value: T) => R1;
287
error: (error: E) => R2;
288
}
289
290
// tapResponse types
291
interface TapResponseObserver<T, E> {
292
next: (value: T) => void;
293
error: (error: E) => void;
294
complete?: () => void;
295
finalize?: () => void;
296
}
297
```