Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Real-time bidirectional communication with WebSocket integration for reactive streams, enabling live data updates and interactive applications.
Create WebSocket observable for real-time communication.
/**
* Create WebSocket subject for bidirectional communication
* @param urlConfigOrSource - WebSocket URL string or configuration object
* @returns WebSocketSubject for sending and receiving messages
*/
function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;Usage Examples:
import { webSocket } from "rxjs/webSocket";
// Simple WebSocket connection
const socket$ = webSocket('ws://localhost:8080');
// Send messages
socket$.next({ type: 'message', data: 'Hello Server' });
// Receive messages
socket$.subscribe(
message => console.log('Received:', message),
err => console.error('WebSocket error:', err),
() => console.log('WebSocket connection closed')
);
// Close connection
socket$.complete();Specialized subject for WebSocket communication.
/**
* Subject that wraps WebSocket for bidirectional communication
*/
class WebSocketSubject<T> extends AnonymousSubject<T> {
/**
* URL of the WebSocket endpoint
*/
readonly url: string;
/**
* Current WebSocket connection state
*/
readonly socket: WebSocket | null;
/**
* Create multiplexed observable for specific message types
* @param subMsg - Function returning subscription message
* @param unsubMsg - Function returning unsubscription message
* @param messageFilter - Predicate to filter relevant messages
* @returns Observable for specific message type
*/
multiplex<R>(
subMsg: () => any,
unsubMsg: () => any,
messageFilter: (value: T) => boolean
): Observable<R>;
/**
* Manually close WebSocket connection
* @param code - Close code (optional)
* @param reason - Close reason (optional)
*/
close(code?: number, reason?: string): void;
/**
* Send message through WebSocket
* @param value - Message to send
*/
next(value: T): void;
/**
* Close connection with error
* @param err - Error to emit
*/
error(err: any): void;
/**
* Complete the connection
*/
complete(): void;
/**
* Unsubscribe from WebSocket
*/
unsubscribe(): void;
}Configuration for WebSocket connections.
/**
* Configuration object for WebSocket connections
*/
interface WebSocketSubjectConfig<T> {
/** WebSocket URL */
url: string;
/** WebSocket protocol */
protocol?: string | string[];
/** Custom serializer for outgoing messages */
serializer?: (value: T) => any;
/** Custom deserializer for incoming messages */
deserializer?: (e: MessageEvent) => T;
/** Factory function for creating WebSocket instances */
WebSocketCtor?: { new(url: string, protocol?: string | string[]): WebSocket };
/** Factory function for creating WebSocket with config */
openObserver?: Observer<Event>;
/** Observer for close events */
closeObserver?: Observer<CloseEvent>;
/** Observer for connection closing */
closingObserver?: Observer<void>;
/** Reconnect interval in milliseconds */
reconnectInterval?: number;
/** Maximum reconnection attempts */
reconnectAttempts?: number;
/** Function to generate result selector */
resultSelector?: (e: MessageEvent) => T;
/** Binary type for WebSocket */
binaryType?: 'blob' | 'arraybuffer';
}import { webSocket } from "rxjs/webSocket";
import { retryWhen, delay, tap, take } from "rxjs/operators";
function createReconnectingWebSocket<T>(url: string, maxRetries: number = 5): WebSocketSubject<T> {
return webSocket<T>({
url,
openObserver: {
next: () => console.log('WebSocket connected')
},
closeObserver: {
next: () => console.log('WebSocket disconnected')
}
});
}
// Usage with retry logic
const socket$ = createReconnectingWebSocket<any>('ws://localhost:8080');
const messages$ = socket$.pipe(
retryWhen(errors =>
errors.pipe(
tap(err => console.log('Connection error, retrying...', err)),
delay(2000), // Wait 2 seconds before retry
take(5) // Maximum 5 retry attempts
)
)
);
messages$.subscribe(
message => console.log('Message:', message),
err => console.error('Final error:', err)
);import { webSocket } from "rxjs/webSocket";
import { filter, map } from "rxjs/operators";
interface WebSocketMessage {
type: string;
channel?: string;
data: any;
}
const socket$ = webSocket<WebSocketMessage>('ws://localhost:8080');
// Subscribe to different channels
const chatMessages$ = socket$.multiplex(
() => ({ type: 'subscribe', channel: 'chat' }),
() => ({ type: 'unsubscribe', channel: 'chat' }),
message => message.type === 'chat'
);
const notifications$ = socket$.multiplex(
() => ({ type: 'subscribe', channel: 'notifications' }),
() => ({ type: 'unsubscribe', channel: 'notifications' }),
message => message.type === 'notification'
);
const systemEvents$ = socket$.multiplex(
() => ({ type: 'subscribe', channel: 'system' }),
() => ({ type: 'unsubscribe', channel: 'system' }),
message => message.type === 'system'
);
// Handle different message types
chatMessages$.subscribe(msg => {
console.log('Chat message:', msg.data);
updateChatUI(msg.data);
});
notifications$.subscribe(msg => {
console.log('Notification:', msg.data);
showNotification(msg.data);
});
systemEvents$.subscribe(msg => {
console.log('System event:', msg.data);
handleSystemEvent(msg.data);
});
// Send messages to specific channels
function sendChatMessage(message: string) {
socket$.next({
type: 'chat',
channel: 'chat',
data: { message, timestamp: Date.now() }
});
}import { webSocket } from "rxjs/webSocket";
interface CustomMessage {
id: string;
timestamp: number;
payload: any;
}
const socket$ = webSocket<CustomMessage>({
url: 'ws://localhost:8080',
// Custom serializer for outgoing messages
serializer: (msg: CustomMessage) => {
return JSON.stringify({
...msg,
timestamp: msg.timestamp || Date.now(),
id: msg.id || generateId()
});
},
// Custom deserializer for incoming messages
deserializer: (event: MessageEvent) => {
const data = JSON.parse(event.data);
return {
id: data.id,
timestamp: new Date(data.timestamp),
payload: data.payload
};
},
// Handle binary data
binaryType: 'arraybuffer'
});
function generateId(): string {
return Math.random().toString(36).substr(2, 9);
}
// Send structured message
socket$.next({
id: 'msg-001',
timestamp: Date.now(),
payload: { action: 'join_room', room: 'general' }
});import { webSocket } from "rxjs/webSocket";
import { BehaviorSubject, combineLatest } from "rxjs";
import { map, startWith, catchError } from "rxjs/operators";
class WebSocketService {
private socket$ = webSocket<any>('ws://localhost:8080');
private connectionState$ = new BehaviorSubject<'connecting' | 'connected' | 'disconnected' | 'error'>('connecting');
private reconnectAttempts$ = new BehaviorSubject<number>(0);
// Public state observables
readonly state$ = this.connectionState$.asObservable();
readonly connected$ = this.state$.pipe(map(state => state === 'connected'));
readonly reconnectCount$ = this.reconnectAttempts$.asObservable();
// Combined connection info
readonly connectionInfo$ = combineLatest([
this.state$,
this.reconnectCount$
]).pipe(
map(([state, attempts]) => ({ state, attempts }))
);
constructor() {
this.setupConnection();
}
private setupConnection() {
this.socket$.pipe(
tap(() => {
this.connectionState$.next('connected');
this.reconnectAttempts$.next(0);
}),
retryWhen(errors =>
errors.pipe(
tap(err => {
this.connectionState$.next('error');
const currentAttempts = this.reconnectAttempts$.value;
this.reconnectAttempts$.next(currentAttempts + 1);
}),
delay(2000),
take(10) // Max 10 reconnect attempts
)
),
catchError(err => {
this.connectionState$.next('disconnected');
console.error('WebSocket connection failed permanently:', err);
return EMPTY;
})
).subscribe();
}
send(message: any) {
if (this.connectionState$.value === 'connected') {
this.socket$.next(message);
} else {
console.warn('Cannot send message: WebSocket not connected');
}
}
getMessages() {
return this.socket$.asObservable();
}
disconnect() {
this.socket$.complete();
this.connectionState$.next('disconnected');
}
}
// Usage
const wsService = new WebSocketService();
// Monitor connection state
wsService.connectionInfo$.subscribe(({ state, attempts }) => {
console.log(`Connection state: ${state}, Attempts: ${attempts}`);
updateConnectionIndicator(state);
});
// Handle messages
wsService.getMessages().subscribe(message => {
console.log('Received message:', message);
processMessage(message);
});
// Send messages when connected
wsService.connected$.subscribe(connected => {
if (connected) {
wsService.send({ type: 'hello', data: 'Connected successfully' });
}
});import { webSocket } from "rxjs/webSocket";
import { scan, shareReplay } from "rxjs/operators";
interface DataUpdate {
type: 'create' | 'update' | 'delete';
id: string;
data?: any;
}
class RealTimeDataService<T> {
private socket$ = webSocket<DataUpdate>('ws://localhost:8080');
// Maintain synchronized state
private data$ = this.socket$.pipe(
scan((state: Map<string, T>, update: DataUpdate) => {
const newState = new Map(state);
switch (update.type) {
case 'create':
case 'update':
newState.set(update.id, update.data);
break;
case 'delete':
newState.delete(update.id);
break;
}
return newState;
}, new Map<string, T>()),
shareReplay(1)
);
// Public API
getAllData() {
return this.data$.pipe(
map(dataMap => Array.from(dataMap.values()))
);
}
getItemById(id: string) {
return this.data$.pipe(
map(dataMap => dataMap.get(id)),
filter(item => item !== undefined)
);
}
create(data: T) {
this.socket$.next({
type: 'create',
id: generateId(),
data
});
}
update(id: string, data: Partial<T>) {
this.socket$.next({
type: 'update',
id,
data
});
}
delete(id: string) {
this.socket$.next({
type: 'delete',
id
});
}
}
// Usage for real-time user list
interface User {
id: string;
name: string;
status: 'online' | 'offline';
}
const userService = new RealTimeDataService<User>();
// Subscribe to real-time user updates
userService.getAllData().subscribe(users => {
console.log('Current users:', users);
updateUserList(users);
});
// Listen for specific user changes
userService.getItemById('user-123').subscribe(user => {
if (user) {
console.log('User 123 updated:', user);
updateUserProfile(user);
}
});import { webSocket } from "rxjs/webSocket";
import { catchError, retry, tap } from "rxjs/operators";
const socket$ = webSocket({
url: 'ws://localhost:8080',
openObserver: {
next: () => console.log('WebSocket opened')
},
closeObserver: {
next: (event: CloseEvent) => {
console.log('WebSocket closed:', event.code, event.reason);
// Handle different close codes
if (event.code === 1006) {
console.log('Abnormal closure, likely network issue');
} else if (event.code === 1011) {
console.log('Server terminated connection due to error');
}
}
}
});
const messages$ = socket$.pipe(
tap(message => console.log('Message received:', message)),
catchError(err => {
console.error('WebSocket stream error:', err);
// Handle specific error types
if (err instanceof CloseEvent) {
console.log('Connection closed unexpectedly');
}
// Return empty or alternative stream
return EMPTY;
}),
retry(3) // Retry connection up to 3 times
);
messages$.subscribe({
next: message => handleMessage(message),
error: err => console.error('Subscription error:', err),
complete: () => console.log('WebSocket stream completed')
});interface Observer<T> {
next: (value: T) => void;
error?: (err: any) => void;
complete?: () => void;
}
interface AnonymousSubject<T> extends Observable<T> {
next(value: T): void;
error(err: any): void;
complete(): void;
}