0
# Fetch Operations
1
2
Modern fetch-based HTTP requests with full observable integration, streaming support, and comprehensive error handling for web APIs.
3
4
## Capabilities
5
6
### fromFetch
7
8
Create observables from fetch requests with full streaming and cancellation support.
9
10
```typescript { .api }
11
/**
12
* Create observable from fetch request with streaming and cancellation support
13
* @param input - Request URL or Request object
14
* @param initWithSelector - Fetch init options with optional response selector
15
* @returns Observable emitting Response or selected response data
16
*/
17
function fromFetch<T>(
18
input: string | Request,
19
initWithSelector?: RequestInit & {
20
selector?: (response: Response) => ObservableInput<T>;
21
}
22
): Observable<T extends never ? Response : T>;
23
```
24
25
**Usage Examples:**
26
27
```typescript
28
import { fromFetch } from "rxjs/fetch";
29
import { switchMap, catchError } from "rxjs/operators";
30
import { of } from "rxjs";
31
32
// Simple GET request
33
fromFetch('/api/users').pipe(
34
switchMap(response => {
35
if (response.ok) {
36
return response.json();
37
} else {
38
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
39
}
40
}),
41
catchError(err => {
42
console.error('Request failed:', err);
43
return of({ users: [], error: 'Failed to load users' });
44
})
45
).subscribe(data => console.log('Users:', data));
46
47
// POST request with JSON body
48
const postData = { name: 'Alice', email: 'alice@example.com' };
49
50
fromFetch('/api/users', {
51
method: 'POST',
52
headers: {
53
'Content-Type': 'application/json',
54
},
55
body: JSON.stringify(postData)
56
}).pipe(
57
switchMap(response => {
58
if (response.ok) {
59
return response.json();
60
} else {
61
throw new Error(`Failed to create user: ${response.status}`);
62
}
63
})
64
).subscribe(
65
user => console.log('Created user:', user),
66
err => console.error('Error:', err)
67
);
68
69
// Request with automatic JSON parsing using selector
70
fromFetch('/api/data', {
71
selector: response => response.json()
72
}).subscribe(
73
data => console.log('Data:', data),
74
err => console.error('Error:', err)
75
);
76
77
// Request with custom headers and timeout
78
fromFetch('/api/secure-data', {
79
method: 'GET',
80
headers: {
81
'Authorization': 'Bearer ' + token,
82
'Accept': 'application/json'
83
},
84
signal: AbortSignal.timeout(5000) // 5 second timeout
85
}).pipe(
86
switchMap(response => {
87
if (response.status === 401) {
88
throw new Error('Unauthorized - token may be expired');
89
}
90
if (!response.ok) {
91
throw new Error(`HTTP ${response.status}`);
92
}
93
return response.json();
94
})
95
).subscribe(
96
data => console.log('Secure data:', data),
97
err => console.error('Request error:', err)
98
);
99
```
100
101
### Advanced Fetch Patterns
102
103
**Streaming Response Bodies:**
104
105
```typescript
106
import { fromFetch } from "rxjs/fetch";
107
import { switchMap, tap } from "rxjs/operators";
108
109
// Stream large response as text chunks
110
fromFetch('/api/large-dataset').pipe(
111
switchMap(response => {
112
if (!response.ok) {
113
throw new Error(`HTTP ${response.status}`);
114
}
115
116
// Get readable stream
117
const reader = response.body?.getReader();
118
if (!reader) {
119
throw new Error('Response body not readable');
120
}
121
122
return new Observable(subscriber => {
123
const pump = () => {
124
reader.read().then(({ done, value }) => {
125
if (done) {
126
subscriber.complete();
127
return;
128
}
129
130
// Emit chunk as Uint8Array
131
subscriber.next(value);
132
pump();
133
}).catch(err => subscriber.error(err));
134
};
135
136
pump();
137
138
// Cleanup
139
return () => reader.cancel();
140
});
141
}),
142
tap(chunk => console.log('Received chunk:', chunk.length, 'bytes'))
143
).subscribe(
144
chunk => {
145
// Process each chunk
146
const text = new TextDecoder().decode(chunk);
147
console.log('Chunk text:', text);
148
},
149
err => console.error('Stream error:', err),
150
() => console.log('Stream complete')
151
);
152
```
153
154
**Request Cancellation with AbortController:**
155
156
```typescript
157
import { fromFetch } from "rxjs/fetch";
158
import { takeUntil, switchMap } from "rxjs/operators";
159
import { Subject, timer } from "rxjs";
160
161
const cancelSubject = new Subject<void>();
162
163
// Request that can be cancelled
164
fromFetch('/api/slow-endpoint', {
165
signal: new AbortController().signal
166
}).pipe(
167
takeUntil(cancelSubject), // Cancel when cancelSubject emits
168
switchMap(response => response.json())
169
).subscribe(
170
data => console.log('Data:', data),
171
err => {
172
if (err.name === 'AbortError') {
173
console.log('Request was cancelled');
174
} else {
175
console.error('Request error:', err);
176
}
177
}
178
);
179
180
// Cancel the request after 3 seconds
181
timer(3000).subscribe(() => {
182
console.log('Cancelling request...');
183
cancelSubject.next();
184
});
185
```
186
187
**Retry with Exponential Backoff:**
188
189
```typescript
190
import { fromFetch } from "rxjs/fetch";
191
import { retryWhen, delay, scan, switchMap } from "rxjs/operators";
192
import { throwError, timer } from "rxjs";
193
194
fromFetch('/api/unreliable-endpoint').pipe(
195
switchMap(response => {
196
if (!response.ok) {
197
throw new Error(`HTTP ${response.status}`);
198
}
199
return response.json();
200
}),
201
retryWhen(errors =>
202
errors.pipe(
203
scan((retryCount, err) => {
204
console.log(`Attempt ${retryCount + 1} failed:`, err.message);
205
206
// Stop retrying after 3 attempts
207
if (retryCount >= 2) {
208
throw err;
209
}
210
return retryCount + 1;
211
}, 0),
212
// Exponential backoff: 1s, 2s, 4s
213
switchMap(retryCount => timer(Math.pow(2, retryCount) * 1000))
214
)
215
)
216
).subscribe(
217
data => console.log('Success:', data),
218
err => console.error('Final error after retries:', err)
219
);
220
```
221
222
**File Upload with Progress:**
223
224
```typescript
225
import { fromFetch } from "rxjs/fetch";
226
import { switchMap } from "rxjs/operators";
227
228
function uploadFile(file: File, url: string) {
229
const formData = new FormData();
230
formData.append('file', file);
231
232
return fromFetch(url, {
233
method: 'POST',
234
body: formData,
235
// Note: Don't set Content-Type header for FormData
236
// Browser will set it automatically with boundary
237
}).pipe(
238
switchMap(response => {
239
if (!response.ok) {
240
throw new Error(`Upload failed: ${response.status}`);
241
}
242
return response.json();
243
})
244
);
245
}
246
247
// Usage
248
const fileInput = document.querySelector('input[type="file"]') as HTMLInputElement;
249
const file = fileInput.files?.[0];
250
251
if (file) {
252
uploadFile(file, '/api/upload').subscribe(
253
result => console.log('Upload successful:', result),
254
err => console.error('Upload error:', err)
255
);
256
}
257
```
258
259
**Parallel Requests with Error Handling:**
260
261
```typescript
262
import { fromFetch } from "rxjs/fetch";
263
import { forkJoin, of } from "rxjs";
264
import { switchMap, catchError } from "rxjs/operators";
265
266
// Fetch multiple resources in parallel
267
const requests = [
268
'/api/users',
269
'/api/posts',
270
'/api/comments'
271
].map(url =>
272
fromFetch(url).pipe(
273
switchMap(response => {
274
if (!response.ok) {
275
throw new Error(`Failed to fetch ${url}: ${response.status}`);
276
}
277
return response.json();
278
}),
279
catchError(err => {
280
console.error(`Error fetching ${url}:`, err);
281
return of(null); // Return null for failed requests
282
})
283
)
284
);
285
286
forkJoin(requests).subscribe(
287
([users, posts, comments]) => {
288
console.log('Users:', users);
289
console.log('Posts:', posts);
290
console.log('Comments:', comments);
291
292
// Handle cases where some requests failed (null values)
293
if (users) {
294
// Process users
295
}
296
if (posts) {
297
// Process posts
298
}
299
}
300
);
301
```
302
303
**Request Deduplication:**
304
305
```typescript
306
import { fromFetch } from "rxjs/fetch";
307
import { shareReplay, switchMap } from "rxjs/operators";
308
import { BehaviorSubject } from "rxjs";
309
310
// Cache and deduplicate identical requests
311
const requestCache = new Map<string, Observable<any>>();
312
313
function cachedFetch(url: string, ttl: number = 60000) {
314
if (requestCache.has(url)) {
315
return requestCache.get(url)!;
316
}
317
318
const request$ = fromFetch(url).pipe(
319
switchMap(response => {
320
if (!response.ok) {
321
throw new Error(`HTTP ${response.status}`);
322
}
323
return response.json();
324
}),
325
shareReplay({ bufferSize: 1, refCount: true })
326
);
327
328
requestCache.set(url, request$);
329
330
// Clear cache after TTL
331
timer(ttl).subscribe(() => {
332
requestCache.delete(url);
333
});
334
335
return request$;
336
}
337
338
// Multiple calls to same URL will share the same request
339
cachedFetch('/api/config').subscribe(config => console.log('Config 1:', config));
340
cachedFetch('/api/config').subscribe(config => console.log('Config 2:', config));
341
// Only one HTTP request is made
342
```
343
344
## Integration with Other RxJS Features
345
346
**Combining with WebSocket for Real-time Updates:**
347
348
```typescript
349
import { fromFetch } from "rxjs/fetch";
350
import { webSocket } from "rxjs/webSocket";
351
import { merge, switchMap } from "rxjs/operators";
352
353
// Initial data from REST API
354
const initialData$ = fromFetch('/api/data').pipe(
355
switchMap(response => response.json())
356
);
357
358
// Real-time updates via WebSocket
359
const updates$ = webSocket('ws://localhost:8080/updates');
360
361
// Combine initial data with real-time updates
362
merge(initialData$, updates$).subscribe(
363
data => console.log('Data update:', data)
364
);
365
```
366
367
**Request/Response Middleware Pattern:**
368
369
```typescript
370
import { fromFetch } from "rxjs/fetch";
371
import { switchMap, tap, finalize } from "rxjs/operators";
372
373
// Request interceptor
374
function withAuth(request: RequestInit = {}): RequestInit {
375
return {
376
...request,
377
headers: {
378
...request.headers,
379
'Authorization': `Bearer ${getAuthToken()}`
380
}
381
};
382
}
383
384
// Response interceptor
385
function withLogging<T>(source: Observable<T>): Observable<T> {
386
return source.pipe(
387
tap(response => console.log('Response received:', response)),
388
finalize(() => console.log('Request completed'))
389
);
390
}
391
392
// Usage with middleware
393
function apiCall(url: string, options?: RequestInit) {
394
return fromFetch(url, withAuth(options)).pipe(
395
switchMap(response => {
396
if (response.status === 401) {
397
// Handle auth refresh
398
return refreshToken().pipe(
399
switchMap(() => fromFetch(url, withAuth(options)))
400
);
401
}
402
if (!response.ok) {
403
throw new Error(`HTTP ${response.status}`);
404
}
405
return response.json();
406
})
407
);
408
}
409
410
// Apply logging middleware
411
withLogging(apiCall('/api/protected-data')).subscribe(
412
data => console.log('Protected data:', data),
413
err => console.error('Error:', err)
414
);
415
```
416
417
## Types
418
419
```typescript { .api }
420
interface RequestInit {
421
method?: string;
422
headers?: HeadersInit;
423
body?: BodyInit | null;
424
mode?: RequestMode;
425
credentials?: RequestCredentials;
426
cache?: RequestCache;
427
redirect?: RequestRedirect;
428
referrer?: string;
429
referrerPolicy?: ReferrerPolicy;
430
integrity?: string;
431
keepalive?: boolean;
432
signal?: AbortSignal | null;
433
window?: any;
434
}
435
436
interface Response {
437
readonly headers: Headers;
438
readonly ok: boolean;
439
readonly redirected: boolean;
440
readonly status: number;
441
readonly statusText: string;
442
readonly type: ResponseType;
443
readonly url: string;
444
readonly body: ReadableStream<Uint8Array> | null;
445
readonly bodyUsed: boolean;
446
447
arrayBuffer(): Promise<ArrayBuffer>;
448
blob(): Promise<Blob>;
449
formData(): Promise<FormData>;
450
json(): Promise<any>;
451
text(): Promise<string>;
452
clone(): Response;
453
}
454
455
type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
456
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;
457
```