Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Modern fetch-based HTTP requests with full observable integration, streaming support, and comprehensive error handling for web APIs.
Create observables from fetch requests with full streaming and cancellation support.
/**
* Create observable from fetch request with streaming and cancellation support
* @param input - Request URL or Request object
* @param initWithSelector - Fetch init options with optional response selector
* @returns Observable emitting Response or selected response data
*/
function fromFetch<T>(
input: string | Request,
initWithSelector?: RequestInit & {
selector?: (response: Response) => ObservableInput<T>;
}
): Observable<T extends never ? Response : T>;Usage Examples:
import { fromFetch } from "rxjs/fetch";
import { switchMap, catchError } from "rxjs/operators";
import { of } from "rxjs";
// Simple GET request
fromFetch('/api/users').pipe(
switchMap(response => {
if (response.ok) {
return response.json();
} else {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
}),
catchError(err => {
console.error('Request failed:', err);
return of({ users: [], error: 'Failed to load users' });
})
).subscribe(data => console.log('Users:', data));
// POST request with JSON body
const postData = { name: 'Alice', email: 'alice@example.com' };
fromFetch('/api/users', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(postData)
}).pipe(
switchMap(response => {
if (response.ok) {
return response.json();
} else {
throw new Error(`Failed to create user: ${response.status}`);
}
})
).subscribe(
user => console.log('Created user:', user),
err => console.error('Error:', err)
);
// Request with automatic JSON parsing using selector
fromFetch('/api/data', {
selector: response => response.json()
}).subscribe(
data => console.log('Data:', data),
err => console.error('Error:', err)
);
// Request with custom headers and timeout
fromFetch('/api/secure-data', {
method: 'GET',
headers: {
'Authorization': 'Bearer ' + token,
'Accept': 'application/json'
},
signal: AbortSignal.timeout(5000) // 5 second timeout
}).pipe(
switchMap(response => {
if (response.status === 401) {
throw new Error('Unauthorized - token may be expired');
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return response.json();
})
).subscribe(
data => console.log('Secure data:', data),
err => console.error('Request error:', err)
);Streaming Response Bodies:
import { fromFetch } from "rxjs/fetch";
import { switchMap, tap } from "rxjs/operators";
// Stream large response as text chunks
fromFetch('/api/large-dataset').pipe(
switchMap(response => {
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
// Get readable stream
const reader = response.body?.getReader();
if (!reader) {
throw new Error('Response body not readable');
}
return new Observable(subscriber => {
const pump = () => {
reader.read().then(({ done, value }) => {
if (done) {
subscriber.complete();
return;
}
// Emit chunk as Uint8Array
subscriber.next(value);
pump();
}).catch(err => subscriber.error(err));
};
pump();
// Cleanup
return () => reader.cancel();
});
}),
tap(chunk => console.log('Received chunk:', chunk.length, 'bytes'))
).subscribe(
chunk => {
// Process each chunk
const text = new TextDecoder().decode(chunk);
console.log('Chunk text:', text);
},
err => console.error('Stream error:', err),
() => console.log('Stream complete')
);Request Cancellation with AbortController:
import { fromFetch } from "rxjs/fetch";
import { takeUntil, switchMap } from "rxjs/operators";
import { Subject, timer } from "rxjs";
const cancelSubject = new Subject<void>();
// Request that can be cancelled
fromFetch('/api/slow-endpoint', {
signal: new AbortController().signal
}).pipe(
takeUntil(cancelSubject), // Cancel when cancelSubject emits
switchMap(response => response.json())
).subscribe(
data => console.log('Data:', data),
err => {
if (err.name === 'AbortError') {
console.log('Request was cancelled');
} else {
console.error('Request error:', err);
}
}
);
// Cancel the request after 3 seconds
timer(3000).subscribe(() => {
console.log('Cancelling request...');
cancelSubject.next();
});Retry with Exponential Backoff:
import { fromFetch } from "rxjs/fetch";
import { retryWhen, delay, scan, switchMap } from "rxjs/operators";
import { throwError, timer } from "rxjs";
fromFetch('/api/unreliable-endpoint').pipe(
switchMap(response => {
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return response.json();
}),
retryWhen(errors =>
errors.pipe(
scan((retryCount, err) => {
console.log(`Attempt ${retryCount + 1} failed:`, err.message);
// Stop retrying after 3 attempts
if (retryCount >= 2) {
throw err;
}
return retryCount + 1;
}, 0),
// Exponential backoff: 1s, 2s, 4s
switchMap(retryCount => timer(Math.pow(2, retryCount) * 1000))
)
)
).subscribe(
data => console.log('Success:', data),
err => console.error('Final error after retries:', err)
);File Upload with Progress:
import { fromFetch } from "rxjs/fetch";
import { switchMap } from "rxjs/operators";
function uploadFile(file: File, url: string) {
const formData = new FormData();
formData.append('file', file);
return fromFetch(url, {
method: 'POST',
body: formData,
// Note: Don't set Content-Type header for FormData
// Browser will set it automatically with boundary
}).pipe(
switchMap(response => {
if (!response.ok) {
throw new Error(`Upload failed: ${response.status}`);
}
return response.json();
})
);
}
// Usage
const fileInput = document.querySelector('input[type="file"]') as HTMLInputElement;
const file = fileInput.files?.[0];
if (file) {
uploadFile(file, '/api/upload').subscribe(
result => console.log('Upload successful:', result),
err => console.error('Upload error:', err)
);
}Parallel Requests with Error Handling:
import { fromFetch } from "rxjs/fetch";
import { forkJoin, of } from "rxjs";
import { switchMap, catchError } from "rxjs/operators";
// Fetch multiple resources in parallel
const requests = [
'/api/users',
'/api/posts',
'/api/comments'
].map(url =>
fromFetch(url).pipe(
switchMap(response => {
if (!response.ok) {
throw new Error(`Failed to fetch ${url}: ${response.status}`);
}
return response.json();
}),
catchError(err => {
console.error(`Error fetching ${url}:`, err);
return of(null); // Return null for failed requests
})
)
);
forkJoin(requests).subscribe(
([users, posts, comments]) => {
console.log('Users:', users);
console.log('Posts:', posts);
console.log('Comments:', comments);
// Handle cases where some requests failed (null values)
if (users) {
// Process users
}
if (posts) {
// Process posts
}
}
);Request Deduplication:
import { fromFetch } from "rxjs/fetch";
import { shareReplay, switchMap } from "rxjs/operators";
import { BehaviorSubject } from "rxjs";
// Cache and deduplicate identical requests
const requestCache = new Map<string, Observable<any>>();
function cachedFetch(url: string, ttl: number = 60000) {
if (requestCache.has(url)) {
return requestCache.get(url)!;
}
const request$ = fromFetch(url).pipe(
switchMap(response => {
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return response.json();
}),
shareReplay({ bufferSize: 1, refCount: true })
);
requestCache.set(url, request$);
// Clear cache after TTL
timer(ttl).subscribe(() => {
requestCache.delete(url);
});
return request$;
}
// Multiple calls to same URL will share the same request
cachedFetch('/api/config').subscribe(config => console.log('Config 1:', config));
cachedFetch('/api/config').subscribe(config => console.log('Config 2:', config));
// Only one HTTP request is madeCombining with WebSocket for Real-time Updates:
import { fromFetch } from "rxjs/fetch";
import { webSocket } from "rxjs/webSocket";
import { merge, switchMap } from "rxjs/operators";
// Initial data from REST API
const initialData$ = fromFetch('/api/data').pipe(
switchMap(response => response.json())
);
// Real-time updates via WebSocket
const updates$ = webSocket('ws://localhost:8080/updates');
// Combine initial data with real-time updates
merge(initialData$, updates$).subscribe(
data => console.log('Data update:', data)
);Request/Response Middleware Pattern:
import { fromFetch } from "rxjs/fetch";
import { switchMap, tap, finalize } from "rxjs/operators";
// Request interceptor
function withAuth(request: RequestInit = {}): RequestInit {
return {
...request,
headers: {
...request.headers,
'Authorization': `Bearer ${getAuthToken()}`
}
};
}
// Response interceptor
function withLogging<T>(source: Observable<T>): Observable<T> {
return source.pipe(
tap(response => console.log('Response received:', response)),
finalize(() => console.log('Request completed'))
);
}
// Usage with middleware
function apiCall(url: string, options?: RequestInit) {
return fromFetch(url, withAuth(options)).pipe(
switchMap(response => {
if (response.status === 401) {
// Handle auth refresh
return refreshToken().pipe(
switchMap(() => fromFetch(url, withAuth(options)))
);
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return response.json();
})
);
}
// Apply logging middleware
withLogging(apiCall('/api/protected-data')).subscribe(
data => console.log('Protected data:', data),
err => console.error('Error:', err)
);interface RequestInit {
method?: string;
headers?: HeadersInit;
body?: BodyInit | null;
mode?: RequestMode;
credentials?: RequestCredentials;
cache?: RequestCache;
redirect?: RequestRedirect;
referrer?: string;
referrerPolicy?: ReferrerPolicy;
integrity?: string;
keepalive?: boolean;
signal?: AbortSignal | null;
window?: any;
}
interface Response {
readonly headers: Headers;
readonly ok: boolean;
readonly redirected: boolean;
readonly status: number;
readonly statusText: string;
readonly type: ResponseType;
readonly url: string;
readonly body: ReadableStream<Uint8Array> | null;
readonly bodyUsed: boolean;
arrayBuffer(): Promise<ArrayBuffer>;
blob(): Promise<Blob>;
formData(): Promise<FormData>;
json(): Promise<any>;
text(): Promise<string>;
clone(): Response;
}
type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;