CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

websocket-operations.mddocs/

WebSocket Operations

Real-time bidirectional communication with WebSocket integration for reactive streams, enabling live data updates and interactive applications.

Capabilities

webSocket Function

Create WebSocket observable for real-time communication.

/**
 * Create WebSocket subject for bidirectional communication
 * @param urlConfigOrSource - WebSocket URL string or configuration object
 * @returns WebSocketSubject for sending and receiving messages
 */
function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;

Usage Examples:

import { webSocket } from "rxjs/webSocket";

// Simple WebSocket connection
const socket$ = webSocket('ws://localhost:8080');

// Send messages
socket$.next({ type: 'message', data: 'Hello Server' });

// Receive messages
socket$.subscribe(
  message => console.log('Received:', message),
  err => console.error('WebSocket error:', err),
  () => console.log('WebSocket connection closed')
);

// Close connection
socket$.complete();

WebSocketSubject Class

Specialized subject for WebSocket communication.

/**
 * Subject that wraps WebSocket for bidirectional communication
 */
class WebSocketSubject<T> extends AnonymousSubject<T> {
  /**
   * URL of the WebSocket endpoint
   */
  readonly url: string;
  
  /**
   * Current WebSocket connection state
   */
  readonly socket: WebSocket | null;
  
  /**
   * Create multiplexed observable for specific message types
   * @param subMsg - Function returning subscription message
   * @param unsubMsg - Function returning unsubscription message
   * @param messageFilter - Predicate to filter relevant messages
   * @returns Observable for specific message type
   */
  multiplex<R>(
    subMsg: () => any,
    unsubMsg: () => any,
    messageFilter: (value: T) => boolean
  ): Observable<R>;
  
  /**
   * Manually close WebSocket connection
   * @param code - Close code (optional)
   * @param reason - Close reason (optional)
   */
  close(code?: number, reason?: string): void;
  
  /**
   * Send message through WebSocket
   * @param value - Message to send
   */
  next(value: T): void;
  
  /**
   * Close connection with error
   * @param err - Error to emit
   */
  error(err: any): void;
  
  /**
   * Complete the connection
   */
  complete(): void;
  
  /**
   * Unsubscribe from WebSocket
   */
  unsubscribe(): void;
}

WebSocketSubjectConfig Interface

Configuration for WebSocket connections.

/**
 * Configuration object for WebSocket connections
 */
interface WebSocketSubjectConfig<T> {
  /** WebSocket URL */
  url: string;
  
  /** WebSocket protocol */
  protocol?: string | string[];
  
  /** Custom serializer for outgoing messages */
  serializer?: (value: T) => any;
  
  /** Custom deserializer for incoming messages */
  deserializer?: (e: MessageEvent) => T;
  
  /** Factory function for creating WebSocket instances */
  WebSocketCtor?: { new(url: string, protocol?: string | string[]): WebSocket };
  
  /** Factory function for creating WebSocket with config */
  openObserver?: Observer<Event>;
  
  /** Observer for close events */
  closeObserver?: Observer<CloseEvent>;
  
  /** Observer for connection closing */
  closingObserver?: Observer<void>;
  
  /** Reconnect interval in milliseconds */
  reconnectInterval?: number;
  
  /** Maximum reconnection attempts */
  reconnectAttempts?: number;
  
  /** Function to generate result selector */
  resultSelector?: (e: MessageEvent) => T;
  
  /** Binary type for WebSocket */
  binaryType?: 'blob' | 'arraybuffer';
}

Advanced WebSocket Patterns

Automatic Reconnection

import { webSocket } from "rxjs/webSocket";
import { retryWhen, delay, tap, take } from "rxjs/operators";

function createReconnectingWebSocket<T>(url: string, maxRetries: number = 5): WebSocketSubject<T> {
  return webSocket<T>({
    url,
    openObserver: {
      next: () => console.log('WebSocket connected')
    },
    closeObserver: {
      next: () => console.log('WebSocket disconnected')
    }
  });
}

// Usage with retry logic
const socket$ = createReconnectingWebSocket<any>('ws://localhost:8080');

const messages$ = socket$.pipe(
  retryWhen(errors => 
    errors.pipe(
      tap(err => console.log('Connection error, retrying...', err)),
      delay(2000), // Wait 2 seconds before retry
      take(5) // Maximum 5 retry attempts
    )
  )
);

messages$.subscribe(
  message => console.log('Message:', message),
  err => console.error('Final error:', err)
);

Message Multiplexing

import { webSocket } from "rxjs/webSocket";
import { filter, map } from "rxjs/operators";

interface WebSocketMessage {
  type: string;
  channel?: string;
  data: any;
}

const socket$ = webSocket<WebSocketMessage>('ws://localhost:8080');

// Subscribe to different channels
const chatMessages$ = socket$.multiplex(
  () => ({ type: 'subscribe', channel: 'chat' }),
  () => ({ type: 'unsubscribe', channel: 'chat' }),
  message => message.type === 'chat'
);

const notifications$ = socket$.multiplex(
  () => ({ type: 'subscribe', channel: 'notifications' }),
  () => ({ type: 'unsubscribe', channel: 'notifications' }),
  message => message.type === 'notification'
);

const systemEvents$ = socket$.multiplex(
  () => ({ type: 'subscribe', channel: 'system' }),
  () => ({ type: 'unsubscribe', channel: 'system' }),
  message => message.type === 'system'
);

// Handle different message types
chatMessages$.subscribe(msg => {
  console.log('Chat message:', msg.data);
  updateChatUI(msg.data);
});

notifications$.subscribe(msg => {
  console.log('Notification:', msg.data);
  showNotification(msg.data);
});

systemEvents$.subscribe(msg => {
  console.log('System event:', msg.data);
  handleSystemEvent(msg.data);
});

// Send messages to specific channels
function sendChatMessage(message: string) {
  socket$.next({
    type: 'chat',
    channel: 'chat',
    data: { message, timestamp: Date.now() }
  });
}

Custom Serialization

import { webSocket } from "rxjs/webSocket";

interface CustomMessage {
  id: string;
  timestamp: number;
  payload: any;
}

const socket$ = webSocket<CustomMessage>({
  url: 'ws://localhost:8080',
  
  // Custom serializer for outgoing messages
  serializer: (msg: CustomMessage) => {
    return JSON.stringify({
      ...msg,
      timestamp: msg.timestamp || Date.now(),
      id: msg.id || generateId()
    });
  },
  
  // Custom deserializer for incoming messages
  deserializer: (event: MessageEvent) => {
    const data = JSON.parse(event.data);
    return {
      id: data.id,
      timestamp: new Date(data.timestamp),
      payload: data.payload
    };
  },
  
  // Handle binary data
  binaryType: 'arraybuffer'
});

function generateId(): string {
  return Math.random().toString(36).substr(2, 9);
}

// Send structured message
socket$.next({
  id: 'msg-001',
  timestamp: Date.now(),
  payload: { action: 'join_room', room: 'general' }
});

WebSocket State Management

import { webSocket } from "rxjs/webSocket";
import { BehaviorSubject, combineLatest } from "rxjs";
import { map, startWith, catchError } from "rxjs/operators";

class WebSocketService {
  private socket$ = webSocket<any>('ws://localhost:8080');
  private connectionState$ = new BehaviorSubject<'connecting' | 'connected' | 'disconnected' | 'error'>('connecting');
  private reconnectAttempts$ = new BehaviorSubject<number>(0);
  
  // Public state observables
  readonly state$ = this.connectionState$.asObservable();
  readonly connected$ = this.state$.pipe(map(state => state === 'connected'));
  readonly reconnectCount$ = this.reconnectAttempts$.asObservable();
  
  // Combined connection info
  readonly connectionInfo$ = combineLatest([
    this.state$,
    this.reconnectCount$
  ]).pipe(
    map(([state, attempts]) => ({ state, attempts }))
  );
  
  constructor() {
    this.setupConnection();
  }
  
  private setupConnection() {
    this.socket$.pipe(
      tap(() => {
        this.connectionState$.next('connected');
        this.reconnectAttempts$.next(0);
      }),
      retryWhen(errors => 
        errors.pipe(
          tap(err => {
            this.connectionState$.next('error');
            const currentAttempts = this.reconnectAttempts$.value;
            this.reconnectAttempts$.next(currentAttempts + 1);
          }),
          delay(2000),
          take(10) // Max 10 reconnect attempts
        )
      ),
      catchError(err => {
        this.connectionState$.next('disconnected');
        console.error('WebSocket connection failed permanently:', err);
        return EMPTY;
      })
    ).subscribe();
  }
  
  send(message: any) {
    if (this.connectionState$.value === 'connected') {
      this.socket$.next(message);
    } else {
      console.warn('Cannot send message: WebSocket not connected');
    }
  }
  
  getMessages() {
    return this.socket$.asObservable();
  }
  
  disconnect() {
    this.socket$.complete();
    this.connectionState$.next('disconnected');
  }
}

// Usage
const wsService = new WebSocketService();

// Monitor connection state
wsService.connectionInfo$.subscribe(({ state, attempts }) => {
  console.log(`Connection state: ${state}, Attempts: ${attempts}`);
  updateConnectionIndicator(state);
});

// Handle messages
wsService.getMessages().subscribe(message => {
  console.log('Received message:', message);
  processMessage(message);
});

// Send messages when connected
wsService.connected$.subscribe(connected => {
  if (connected) {
    wsService.send({ type: 'hello', data: 'Connected successfully' });
  }
});

Real-time Data Synchronization

import { webSocket } from "rxjs/webSocket";
import { scan, shareReplay } from "rxjs/operators";

interface DataUpdate {
  type: 'create' | 'update' | 'delete';
  id: string;
  data?: any;
}

class RealTimeDataService<T> {
  private socket$ = webSocket<DataUpdate>('ws://localhost:8080');
  
  // Maintain synchronized state
  private data$ = this.socket$.pipe(
    scan((state: Map<string, T>, update: DataUpdate) => {
      const newState = new Map(state);
      
      switch (update.type) {
        case 'create':
        case 'update':
          newState.set(update.id, update.data);
          break;
        case 'delete':
          newState.delete(update.id);
          break;
      }
      
      return newState;
    }, new Map<string, T>()),
    shareReplay(1)
  );
  
  // Public API
  getAllData() {
    return this.data$.pipe(
      map(dataMap => Array.from(dataMap.values()))
    );
  }
  
  getItemById(id: string) {
    return this.data$.pipe(
      map(dataMap => dataMap.get(id)),
      filter(item => item !== undefined)
    );
  }
  
  create(data: T) {
    this.socket$.next({
      type: 'create',
      id: generateId(),
      data
    });
  }
  
  update(id: string, data: Partial<T>) {
    this.socket$.next({
      type: 'update',
      id,
      data
    });
  }
  
  delete(id: string) {
    this.socket$.next({
      type: 'delete',
      id
    });
  }
}

// Usage for real-time user list
interface User {
  id: string;
  name: string;
  status: 'online' | 'offline';
}

const userService = new RealTimeDataService<User>();

// Subscribe to real-time user updates
userService.getAllData().subscribe(users => {
  console.log('Current users:', users);
  updateUserList(users);
});

// Listen for specific user changes
userService.getItemById('user-123').subscribe(user => {
  if (user) {
    console.log('User 123 updated:', user);
    updateUserProfile(user);
  }
});

Error Handling

import { webSocket } from "rxjs/webSocket";
import { catchError, retry, tap } from "rxjs/operators";

const socket$ = webSocket({
  url: 'ws://localhost:8080',
  
  openObserver: {
    next: () => console.log('WebSocket opened')
  },
  
  closeObserver: {
    next: (event: CloseEvent) => {
      console.log('WebSocket closed:', event.code, event.reason);
      
      // Handle different close codes
      if (event.code === 1006) {
        console.log('Abnormal closure, likely network issue');
      } else if (event.code === 1011) {
        console.log('Server terminated connection due to error');
      }
    }
  }
});

const messages$ = socket$.pipe(
  tap(message => console.log('Message received:', message)),
  catchError(err => {
    console.error('WebSocket stream error:', err);
    
    // Handle specific error types
    if (err instanceof CloseEvent) {
      console.log('Connection closed unexpectedly');
    }
    
    // Return empty or alternative stream
    return EMPTY;
  }),
  retry(3) // Retry connection up to 3 times
);

messages$.subscribe({
  next: message => handleMessage(message),
  error: err => console.error('Subscription error:', err),
  complete: () => console.log('WebSocket stream completed')
});

Types

interface Observer<T> {
  next: (value: T) => void;
  error?: (err: any) => void;
  complete?: () => void;
}

interface AnonymousSubject<T> extends Observable<T> {
  next(value: T): void;
  error(err: any): void;
  complete(): void;
}

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json