CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

fetch-operations.mddocs/

Fetch Operations

Modern fetch-based HTTP requests with full observable integration, streaming support, and comprehensive error handling for web APIs.

Capabilities

fromFetch

Create observables from fetch requests with full streaming and cancellation support.

/**
 * Create observable from fetch request with streaming and cancellation support
 * @param input - Request URL or Request object
 * @param initWithSelector - Fetch init options with optional response selector
 * @returns Observable emitting Response or selected response data
 */
function fromFetch<T>(
  input: string | Request,
  initWithSelector?: RequestInit & {
    selector?: (response: Response) => ObservableInput<T>;
  }
): Observable<T extends never ? Response : T>;

Usage Examples:

import { fromFetch } from "rxjs/fetch";
import { switchMap, catchError } from "rxjs/operators";
import { of } from "rxjs";

// Simple GET request
fromFetch('/api/users').pipe(
  switchMap(response => {
    if (response.ok) {
      return response.json();
    } else {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }
  }),
  catchError(err => {
    console.error('Request failed:', err);
    return of({ users: [], error: 'Failed to load users' });
  })
).subscribe(data => console.log('Users:', data));

// POST request with JSON body
const postData = { name: 'Alice', email: 'alice@example.com' };

fromFetch('/api/users', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
  },
  body: JSON.stringify(postData)
}).pipe(
  switchMap(response => {
    if (response.ok) {
      return response.json();
    } else {
      throw new Error(`Failed to create user: ${response.status}`);
    }
  })
).subscribe(
  user => console.log('Created user:', user),
  err => console.error('Error:', err)
);

// Request with automatic JSON parsing using selector
fromFetch('/api/data', {
  selector: response => response.json()
}).subscribe(
  data => console.log('Data:', data),
  err => console.error('Error:', err)
);

// Request with custom headers and timeout
fromFetch('/api/secure-data', {
  method: 'GET',
  headers: {
    'Authorization': 'Bearer ' + token,
    'Accept': 'application/json'
  },
  signal: AbortSignal.timeout(5000) // 5 second timeout
}).pipe(
  switchMap(response => {
    if (response.status === 401) {
      throw new Error('Unauthorized - token may be expired');
    }
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}`);
    }
    return response.json();
  })
).subscribe(
  data => console.log('Secure data:', data),
  err => console.error('Request error:', err)
);

Advanced Fetch Patterns

Streaming Response Bodies:

import { fromFetch } from "rxjs/fetch";
import { switchMap, tap } from "rxjs/operators";

// Stream large response as text chunks
fromFetch('/api/large-dataset').pipe(
  switchMap(response => {
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}`);
    }
    
    // Get readable stream
    const reader = response.body?.getReader();
    if (!reader) {
      throw new Error('Response body not readable');
    }
    
    return new Observable(subscriber => {
      const pump = () => {
        reader.read().then(({ done, value }) => {
          if (done) {
            subscriber.complete();
            return;
          }
          
          // Emit chunk as Uint8Array
          subscriber.next(value);
          pump();
        }).catch(err => subscriber.error(err));
      };
      
      pump();
      
      // Cleanup
      return () => reader.cancel();
    });
  }),
  tap(chunk => console.log('Received chunk:', chunk.length, 'bytes'))
).subscribe(
  chunk => {
    // Process each chunk
    const text = new TextDecoder().decode(chunk);
    console.log('Chunk text:', text);
  },
  err => console.error('Stream error:', err),
  () => console.log('Stream complete')
);

Request Cancellation with AbortController:

import { fromFetch } from "rxjs/fetch";
import { takeUntil, switchMap } from "rxjs/operators";
import { Subject, timer } from "rxjs";

const cancelSubject = new Subject<void>();

// Request that can be cancelled
fromFetch('/api/slow-endpoint', {
  signal: new AbortController().signal
}).pipe(
  takeUntil(cancelSubject), // Cancel when cancelSubject emits
  switchMap(response => response.json())
).subscribe(
  data => console.log('Data:', data),
  err => {
    if (err.name === 'AbortError') {
      console.log('Request was cancelled');
    } else {
      console.error('Request error:', err);
    }
  }
);

// Cancel the request after 3 seconds
timer(3000).subscribe(() => {
  console.log('Cancelling request...');
  cancelSubject.next();
});

Retry with Exponential Backoff:

import { fromFetch } from "rxjs/fetch";
import { retryWhen, delay, scan, switchMap } from "rxjs/operators";
import { throwError, timer } from "rxjs";

fromFetch('/api/unreliable-endpoint').pipe(
  switchMap(response => {
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}`);
    }
    return response.json();
  }),
  retryWhen(errors => 
    errors.pipe(
      scan((retryCount, err) => {
        console.log(`Attempt ${retryCount + 1} failed:`, err.message);
        
        // Stop retrying after 3 attempts
        if (retryCount >= 2) {
          throw err;
        }
        return retryCount + 1;
      }, 0),
      // Exponential backoff: 1s, 2s, 4s
      switchMap(retryCount => timer(Math.pow(2, retryCount) * 1000))
    )
  )
).subscribe(
  data => console.log('Success:', data),
  err => console.error('Final error after retries:', err)
);

File Upload with Progress:

import { fromFetch } from "rxjs/fetch";
import { switchMap } from "rxjs/operators";

function uploadFile(file: File, url: string) {
  const formData = new FormData();
  formData.append('file', file);
  
  return fromFetch(url, {
    method: 'POST',
    body: formData,
    // Note: Don't set Content-Type header for FormData
    // Browser will set it automatically with boundary
  }).pipe(
    switchMap(response => {
      if (!response.ok) {
        throw new Error(`Upload failed: ${response.status}`);
      }
      return response.json();
    })
  );
}

// Usage
const fileInput = document.querySelector('input[type="file"]') as HTMLInputElement;
const file = fileInput.files?.[0];

if (file) {
  uploadFile(file, '/api/upload').subscribe(
    result => console.log('Upload successful:', result),
    err => console.error('Upload error:', err)
  );
}

Parallel Requests with Error Handling:

import { fromFetch } from "rxjs/fetch";
import { forkJoin, of } from "rxjs";
import { switchMap, catchError } from "rxjs/operators";

// Fetch multiple resources in parallel
const requests = [
  '/api/users',
  '/api/posts', 
  '/api/comments'
].map(url => 
  fromFetch(url).pipe(
    switchMap(response => {
      if (!response.ok) {
        throw new Error(`Failed to fetch ${url}: ${response.status}`);
      }
      return response.json();
    }),
    catchError(err => {
      console.error(`Error fetching ${url}:`, err);
      return of(null); // Return null for failed requests
    })
  )
);

forkJoin(requests).subscribe(
  ([users, posts, comments]) => {
    console.log('Users:', users);
    console.log('Posts:', posts); 
    console.log('Comments:', comments);
    
    // Handle cases where some requests failed (null values)
    if (users) {
      // Process users
    }
    if (posts) {
      // Process posts
    }
  }
);

Request Deduplication:

import { fromFetch } from "rxjs/fetch";
import { shareReplay, switchMap } from "rxjs/operators";
import { BehaviorSubject } from "rxjs";

// Cache and deduplicate identical requests
const requestCache = new Map<string, Observable<any>>();

function cachedFetch(url: string, ttl: number = 60000) {
  if (requestCache.has(url)) {
    return requestCache.get(url)!;
  }
  
  const request$ = fromFetch(url).pipe(
    switchMap(response => {
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }
      return response.json();
    }),
    shareReplay({ bufferSize: 1, refCount: true })
  );
  
  requestCache.set(url, request$);
  
  // Clear cache after TTL
  timer(ttl).subscribe(() => {
    requestCache.delete(url);
  });
  
  return request$;
}

// Multiple calls to same URL will share the same request
cachedFetch('/api/config').subscribe(config => console.log('Config 1:', config));
cachedFetch('/api/config').subscribe(config => console.log('Config 2:', config));
// Only one HTTP request is made

Integration with Other RxJS Features

Combining with WebSocket for Real-time Updates:

import { fromFetch } from "rxjs/fetch";
import { webSocket } from "rxjs/webSocket";
import { merge, switchMap } from "rxjs/operators";

// Initial data from REST API
const initialData$ = fromFetch('/api/data').pipe(
  switchMap(response => response.json())
);

// Real-time updates via WebSocket
const updates$ = webSocket('ws://localhost:8080/updates');

// Combine initial data with real-time updates
merge(initialData$, updates$).subscribe(
  data => console.log('Data update:', data)
);

Request/Response Middleware Pattern:

import { fromFetch } from "rxjs/fetch";
import { switchMap, tap, finalize } from "rxjs/operators";

// Request interceptor
function withAuth(request: RequestInit = {}): RequestInit {
  return {
    ...request,
    headers: {
      ...request.headers,
      'Authorization': `Bearer ${getAuthToken()}`
    }
  };
}

// Response interceptor
function withLogging<T>(source: Observable<T>): Observable<T> {
  return source.pipe(
    tap(response => console.log('Response received:', response)),
    finalize(() => console.log('Request completed'))
  );
}

// Usage with middleware
function apiCall(url: string, options?: RequestInit) {
  return fromFetch(url, withAuth(options)).pipe(
    switchMap(response => {
      if (response.status === 401) {
        // Handle auth refresh
        return refreshToken().pipe(
          switchMap(() => fromFetch(url, withAuth(options)))
        );
      }
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }
      return response.json();
    })
  );
}

// Apply logging middleware
withLogging(apiCall('/api/protected-data')).subscribe(
  data => console.log('Protected data:', data),
  err => console.error('Error:', err)
);

Types

interface RequestInit {
  method?: string;
  headers?: HeadersInit;
  body?: BodyInit | null;
  mode?: RequestMode;
  credentials?: RequestCredentials;
  cache?: RequestCache;
  redirect?: RequestRedirect;
  referrer?: string;
  referrerPolicy?: ReferrerPolicy;
  integrity?: string;
  keepalive?: boolean;
  signal?: AbortSignal | null;
  window?: any;
}

interface Response {
  readonly headers: Headers;
  readonly ok: boolean;
  readonly redirected: boolean;
  readonly status: number;
  readonly statusText: string;
  readonly type: ResponseType;
  readonly url: string;
  readonly body: ReadableStream<Uint8Array> | null;
  readonly bodyUsed: boolean;
  
  arrayBuffer(): Promise<ArrayBuffer>;
  blob(): Promise<Blob>;
  formData(): Promise<FormData>;
  json(): Promise<any>;
  text(): Promise<string>;
  clone(): Response;
}

type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json