0
# WebSocket Operations
1
2
Real-time bidirectional communication with WebSocket integration for reactive streams, enabling live data updates and interactive applications.
3
4
## Capabilities
5
6
### webSocket Function
7
8
Create WebSocket observable for real-time communication.
9
10
```typescript { .api }
11
/**
12
* Create WebSocket subject for bidirectional communication
13
* @param urlConfigOrSource - WebSocket URL string or configuration object
14
* @returns WebSocketSubject for sending and receiving messages
15
*/
16
function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;
17
```
18
19
**Usage Examples:**
20
21
```typescript
22
import { webSocket } from "rxjs/webSocket";
23
24
// Simple WebSocket connection
25
const socket$ = webSocket('ws://localhost:8080');
26
27
// Send messages
28
socket$.next({ type: 'message', data: 'Hello Server' });
29
30
// Receive messages
31
socket$.subscribe(
32
message => console.log('Received:', message),
33
err => console.error('WebSocket error:', err),
34
() => console.log('WebSocket connection closed')
35
);
36
37
// Close connection
38
socket$.complete();
39
```
40
41
### WebSocketSubject Class
42
43
Specialized subject for WebSocket communication.
44
45
```typescript { .api }
46
/**
47
* Subject that wraps WebSocket for bidirectional communication
48
*/
49
class WebSocketSubject<T> extends AnonymousSubject<T> {
50
/**
51
* URL of the WebSocket endpoint
52
*/
53
readonly url: string;
54
55
/**
56
* Current WebSocket connection state
57
*/
58
readonly socket: WebSocket | null;
59
60
/**
61
* Create multiplexed observable for specific message types
62
* @param subMsg - Function returning subscription message
63
* @param unsubMsg - Function returning unsubscription message
64
* @param messageFilter - Predicate to filter relevant messages
65
* @returns Observable for specific message type
66
*/
67
multiplex<R>(
68
subMsg: () => any,
69
unsubMsg: () => any,
70
messageFilter: (value: T) => boolean
71
): Observable<R>;
72
73
/**
74
* Manually close WebSocket connection
75
* @param code - Close code (optional)
76
* @param reason - Close reason (optional)
77
*/
78
close(code?: number, reason?: string): void;
79
80
/**
81
* Send message through WebSocket
82
* @param value - Message to send
83
*/
84
next(value: T): void;
85
86
/**
87
* Close connection with error
88
* @param err - Error to emit
89
*/
90
error(err: any): void;
91
92
/**
93
* Complete the connection
94
*/
95
complete(): void;
96
97
/**
98
* Unsubscribe from WebSocket
99
*/
100
unsubscribe(): void;
101
}
102
```
103
104
### WebSocketSubjectConfig Interface
105
106
Configuration for WebSocket connections.
107
108
```typescript { .api }
109
/**
110
* Configuration object for WebSocket connections
111
*/
112
interface WebSocketSubjectConfig<T> {
113
/** WebSocket URL */
114
url: string;
115
116
/** WebSocket protocol */
117
protocol?: string | string[];
118
119
/** Custom serializer for outgoing messages */
120
serializer?: (value: T) => any;
121
122
/** Custom deserializer for incoming messages */
123
deserializer?: (e: MessageEvent) => T;
124
125
/** Factory function for creating WebSocket instances */
126
WebSocketCtor?: { new(url: string, protocol?: string | string[]): WebSocket };
127
128
/** Factory function for creating WebSocket with config */
129
openObserver?: Observer<Event>;
130
131
/** Observer for close events */
132
closeObserver?: Observer<CloseEvent>;
133
134
/** Observer for connection closing */
135
closingObserver?: Observer<void>;
136
137
/** Reconnect interval in milliseconds */
138
reconnectInterval?: number;
139
140
/** Maximum reconnection attempts */
141
reconnectAttempts?: number;
142
143
/** Function to generate result selector */
144
resultSelector?: (e: MessageEvent) => T;
145
146
/** Binary type for WebSocket */
147
binaryType?: 'blob' | 'arraybuffer';
148
}
149
```
150
151
## Advanced WebSocket Patterns
152
153
### Automatic Reconnection
154
155
```typescript
156
import { webSocket } from "rxjs/webSocket";
157
import { retryWhen, delay, tap, take } from "rxjs/operators";
158
159
function createReconnectingWebSocket<T>(url: string, maxRetries: number = 5): WebSocketSubject<T> {
160
return webSocket<T>({
161
url,
162
openObserver: {
163
next: () => console.log('WebSocket connected')
164
},
165
closeObserver: {
166
next: () => console.log('WebSocket disconnected')
167
}
168
});
169
}
170
171
// Usage with retry logic
172
const socket$ = createReconnectingWebSocket<any>('ws://localhost:8080');
173
174
const messages$ = socket$.pipe(
175
retryWhen(errors =>
176
errors.pipe(
177
tap(err => console.log('Connection error, retrying...', err)),
178
delay(2000), // Wait 2 seconds before retry
179
take(5) // Maximum 5 retry attempts
180
)
181
)
182
);
183
184
messages$.subscribe(
185
message => console.log('Message:', message),
186
err => console.error('Final error:', err)
187
);
188
```
189
190
### Message Multiplexing
191
192
```typescript
193
import { webSocket } from "rxjs/webSocket";
194
import { filter, map } from "rxjs/operators";
195
196
interface WebSocketMessage {
197
type: string;
198
channel?: string;
199
data: any;
200
}
201
202
const socket$ = webSocket<WebSocketMessage>('ws://localhost:8080');
203
204
// Subscribe to different channels
205
const chatMessages$ = socket$.multiplex(
206
() => ({ type: 'subscribe', channel: 'chat' }),
207
() => ({ type: 'unsubscribe', channel: 'chat' }),
208
message => message.type === 'chat'
209
);
210
211
const notifications$ = socket$.multiplex(
212
() => ({ type: 'subscribe', channel: 'notifications' }),
213
() => ({ type: 'unsubscribe', channel: 'notifications' }),
214
message => message.type === 'notification'
215
);
216
217
const systemEvents$ = socket$.multiplex(
218
() => ({ type: 'subscribe', channel: 'system' }),
219
() => ({ type: 'unsubscribe', channel: 'system' }),
220
message => message.type === 'system'
221
);
222
223
// Handle different message types
224
chatMessages$.subscribe(msg => {
225
console.log('Chat message:', msg.data);
226
updateChatUI(msg.data);
227
});
228
229
notifications$.subscribe(msg => {
230
console.log('Notification:', msg.data);
231
showNotification(msg.data);
232
});
233
234
systemEvents$.subscribe(msg => {
235
console.log('System event:', msg.data);
236
handleSystemEvent(msg.data);
237
});
238
239
// Send messages to specific channels
240
function sendChatMessage(message: string) {
241
socket$.next({
242
type: 'chat',
243
channel: 'chat',
244
data: { message, timestamp: Date.now() }
245
});
246
}
247
```
248
249
### Custom Serialization
250
251
```typescript
252
import { webSocket } from "rxjs/webSocket";
253
254
interface CustomMessage {
255
id: string;
256
timestamp: number;
257
payload: any;
258
}
259
260
const socket$ = webSocket<CustomMessage>({
261
url: 'ws://localhost:8080',
262
263
// Custom serializer for outgoing messages
264
serializer: (msg: CustomMessage) => {
265
return JSON.stringify({
266
...msg,
267
timestamp: msg.timestamp || Date.now(),
268
id: msg.id || generateId()
269
});
270
},
271
272
// Custom deserializer for incoming messages
273
deserializer: (event: MessageEvent) => {
274
const data = JSON.parse(event.data);
275
return {
276
id: data.id,
277
timestamp: new Date(data.timestamp),
278
payload: data.payload
279
};
280
},
281
282
// Handle binary data
283
binaryType: 'arraybuffer'
284
});
285
286
function generateId(): string {
287
return Math.random().toString(36).substr(2, 9);
288
}
289
290
// Send structured message
291
socket$.next({
292
id: 'msg-001',
293
timestamp: Date.now(),
294
payload: { action: 'join_room', room: 'general' }
295
});
296
```
297
298
### WebSocket State Management
299
300
```typescript
301
import { webSocket } from "rxjs/webSocket";
302
import { BehaviorSubject, combineLatest } from "rxjs";
303
import { map, startWith, catchError } from "rxjs/operators";
304
305
class WebSocketService {
306
private socket$ = webSocket<any>('ws://localhost:8080');
307
private connectionState$ = new BehaviorSubject<'connecting' | 'connected' | 'disconnected' | 'error'>('connecting');
308
private reconnectAttempts$ = new BehaviorSubject<number>(0);
309
310
// Public state observables
311
readonly state$ = this.connectionState$.asObservable();
312
readonly connected$ = this.state$.pipe(map(state => state === 'connected'));
313
readonly reconnectCount$ = this.reconnectAttempts$.asObservable();
314
315
// Combined connection info
316
readonly connectionInfo$ = combineLatest([
317
this.state$,
318
this.reconnectCount$
319
]).pipe(
320
map(([state, attempts]) => ({ state, attempts }))
321
);
322
323
constructor() {
324
this.setupConnection();
325
}
326
327
private setupConnection() {
328
this.socket$.pipe(
329
tap(() => {
330
this.connectionState$.next('connected');
331
this.reconnectAttempts$.next(0);
332
}),
333
retryWhen(errors =>
334
errors.pipe(
335
tap(err => {
336
this.connectionState$.next('error');
337
const currentAttempts = this.reconnectAttempts$.value;
338
this.reconnectAttempts$.next(currentAttempts + 1);
339
}),
340
delay(2000),
341
take(10) // Max 10 reconnect attempts
342
)
343
),
344
catchError(err => {
345
this.connectionState$.next('disconnected');
346
console.error('WebSocket connection failed permanently:', err);
347
return EMPTY;
348
})
349
).subscribe();
350
}
351
352
send(message: any) {
353
if (this.connectionState$.value === 'connected') {
354
this.socket$.next(message);
355
} else {
356
console.warn('Cannot send message: WebSocket not connected');
357
}
358
}
359
360
getMessages() {
361
return this.socket$.asObservable();
362
}
363
364
disconnect() {
365
this.socket$.complete();
366
this.connectionState$.next('disconnected');
367
}
368
}
369
370
// Usage
371
const wsService = new WebSocketService();
372
373
// Monitor connection state
374
wsService.connectionInfo$.subscribe(({ state, attempts }) => {
375
console.log(`Connection state: ${state}, Attempts: ${attempts}`);
376
updateConnectionIndicator(state);
377
});
378
379
// Handle messages
380
wsService.getMessages().subscribe(message => {
381
console.log('Received message:', message);
382
processMessage(message);
383
});
384
385
// Send messages when connected
386
wsService.connected$.subscribe(connected => {
387
if (connected) {
388
wsService.send({ type: 'hello', data: 'Connected successfully' });
389
}
390
});
391
```
392
393
### Real-time Data Synchronization
394
395
```typescript
396
import { webSocket } from "rxjs/webSocket";
397
import { scan, shareReplay } from "rxjs/operators";
398
399
interface DataUpdate {
400
type: 'create' | 'update' | 'delete';
401
id: string;
402
data?: any;
403
}
404
405
class RealTimeDataService<T> {
406
private socket$ = webSocket<DataUpdate>('ws://localhost:8080');
407
408
// Maintain synchronized state
409
private data$ = this.socket$.pipe(
410
scan((state: Map<string, T>, update: DataUpdate) => {
411
const newState = new Map(state);
412
413
switch (update.type) {
414
case 'create':
415
case 'update':
416
newState.set(update.id, update.data);
417
break;
418
case 'delete':
419
newState.delete(update.id);
420
break;
421
}
422
423
return newState;
424
}, new Map<string, T>()),
425
shareReplay(1)
426
);
427
428
// Public API
429
getAllData() {
430
return this.data$.pipe(
431
map(dataMap => Array.from(dataMap.values()))
432
);
433
}
434
435
getItemById(id: string) {
436
return this.data$.pipe(
437
map(dataMap => dataMap.get(id)),
438
filter(item => item !== undefined)
439
);
440
}
441
442
create(data: T) {
443
this.socket$.next({
444
type: 'create',
445
id: generateId(),
446
data
447
});
448
}
449
450
update(id: string, data: Partial<T>) {
451
this.socket$.next({
452
type: 'update',
453
id,
454
data
455
});
456
}
457
458
delete(id: string) {
459
this.socket$.next({
460
type: 'delete',
461
id
462
});
463
}
464
}
465
466
// Usage for real-time user list
467
interface User {
468
id: string;
469
name: string;
470
status: 'online' | 'offline';
471
}
472
473
const userService = new RealTimeDataService<User>();
474
475
// Subscribe to real-time user updates
476
userService.getAllData().subscribe(users => {
477
console.log('Current users:', users);
478
updateUserList(users);
479
});
480
481
// Listen for specific user changes
482
userService.getItemById('user-123').subscribe(user => {
483
if (user) {
484
console.log('User 123 updated:', user);
485
updateUserProfile(user);
486
}
487
});
488
```
489
490
## Error Handling
491
492
```typescript
493
import { webSocket } from "rxjs/webSocket";
494
import { catchError, retry, tap } from "rxjs/operators";
495
496
const socket$ = webSocket({
497
url: 'ws://localhost:8080',
498
499
openObserver: {
500
next: () => console.log('WebSocket opened')
501
},
502
503
closeObserver: {
504
next: (event: CloseEvent) => {
505
console.log('WebSocket closed:', event.code, event.reason);
506
507
// Handle different close codes
508
if (event.code === 1006) {
509
console.log('Abnormal closure, likely network issue');
510
} else if (event.code === 1011) {
511
console.log('Server terminated connection due to error');
512
}
513
}
514
}
515
});
516
517
const messages$ = socket$.pipe(
518
tap(message => console.log('Message received:', message)),
519
catchError(err => {
520
console.error('WebSocket stream error:', err);
521
522
// Handle specific error types
523
if (err instanceof CloseEvent) {
524
console.log('Connection closed unexpectedly');
525
}
526
527
// Return empty or alternative stream
528
return EMPTY;
529
}),
530
retry(3) // Retry connection up to 3 times
531
);
532
533
messages$.subscribe({
534
next: message => handleMessage(message),
535
error: err => console.error('Subscription error:', err),
536
complete: () => console.log('WebSocket stream completed')
537
});
538
```
539
540
## Types
541
542
```typescript { .api }
543
interface Observer<T> {
544
next: (value: T) => void;
545
error?: (err: any) => void;
546
complete?: () => void;
547
}
548
549
interface AnonymousSubject<T> extends Observable<T> {
550
next(value: T): void;
551
error(err: any): void;
552
complete(): void;
553
}
554
```