or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ajax-operations.mdcombination-operators.mdcore-types.mderror-handling.mdfetch-operations.mdfiltering-operators.mdindex.mdobservable-creation.mdschedulers.mdsubjects.mdtesting-utilities.mdtransformation-operators.mdwebsocket-operations.md

websocket-operations.mddocs/

0

# WebSocket Operations

1

2

Real-time bidirectional communication with WebSocket integration for reactive streams, enabling live data updates and interactive applications.

3

4

## Capabilities

5

6

### webSocket Function

7

8

Create WebSocket observable for real-time communication.

9

10

```typescript { .api }

11

/**

12

* Create WebSocket subject for bidirectional communication

13

* @param urlConfigOrSource - WebSocket URL string or configuration object

14

* @returns WebSocketSubject for sending and receiving messages

15

*/

16

function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;

17

```

18

19

**Usage Examples:**

20

21

```typescript

22

import { webSocket } from "rxjs/webSocket";

23

24

// Simple WebSocket connection

25

const socket$ = webSocket('ws://localhost:8080');

26

27

// Send messages

28

socket$.next({ type: 'message', data: 'Hello Server' });

29

30

// Receive messages

31

socket$.subscribe(

32

message => console.log('Received:', message),

33

err => console.error('WebSocket error:', err),

34

() => console.log('WebSocket connection closed')

35

);

36

37

// Close connection

38

socket$.complete();

39

```

40

41

### WebSocketSubject Class

42

43

Specialized subject for WebSocket communication.

44

45

```typescript { .api }

46

/**

47

* Subject that wraps WebSocket for bidirectional communication

48

*/

49

class WebSocketSubject<T> extends AnonymousSubject<T> {

50

/**

51

* URL of the WebSocket endpoint

52

*/

53

readonly url: string;

54

55

/**

56

* Current WebSocket connection state

57

*/

58

readonly socket: WebSocket | null;

59

60

/**

61

* Create multiplexed observable for specific message types

62

* @param subMsg - Function returning subscription message

63

* @param unsubMsg - Function returning unsubscription message

64

* @param messageFilter - Predicate to filter relevant messages

65

* @returns Observable for specific message type

66

*/

67

multiplex<R>(

68

subMsg: () => any,

69

unsubMsg: () => any,

70

messageFilter: (value: T) => boolean

71

): Observable<R>;

72

73

/**

74

* Manually close WebSocket connection

75

* @param code - Close code (optional)

76

* @param reason - Close reason (optional)

77

*/

78

close(code?: number, reason?: string): void;

79

80

/**

81

* Send message through WebSocket

82

* @param value - Message to send

83

*/

84

next(value: T): void;

85

86

/**

87

* Close connection with error

88

* @param err - Error to emit

89

*/

90

error(err: any): void;

91

92

/**

93

* Complete the connection

94

*/

95

complete(): void;

96

97

/**

98

* Unsubscribe from WebSocket

99

*/

100

unsubscribe(): void;

101

}

102

```

103

104

### WebSocketSubjectConfig Interface

105

106

Configuration for WebSocket connections.

107

108

```typescript { .api }

109

/**

110

* Configuration object for WebSocket connections

111

*/

112

interface WebSocketSubjectConfig<T> {

113

/** WebSocket URL */

114

url: string;

115

116

/** WebSocket protocol */

117

protocol?: string | string[];

118

119

/** Custom serializer for outgoing messages */

120

serializer?: (value: T) => any;

121

122

/** Custom deserializer for incoming messages */

123

deserializer?: (e: MessageEvent) => T;

124

125

/** Factory function for creating WebSocket instances */

126

WebSocketCtor?: { new(url: string, protocol?: string | string[]): WebSocket };

127

128

/** Factory function for creating WebSocket with config */

129

openObserver?: Observer<Event>;

130

131

/** Observer for close events */

132

closeObserver?: Observer<CloseEvent>;

133

134

/** Observer for connection closing */

135

closingObserver?: Observer<void>;

136

137

/** Reconnect interval in milliseconds */

138

reconnectInterval?: number;

139

140

/** Maximum reconnection attempts */

141

reconnectAttempts?: number;

142

143

/** Function to generate result selector */

144

resultSelector?: (e: MessageEvent) => T;

145

146

/** Binary type for WebSocket */

147

binaryType?: 'blob' | 'arraybuffer';

148

}

149

```

150

151

## Advanced WebSocket Patterns

152

153

### Automatic Reconnection

154

155

```typescript

156

import { webSocket } from "rxjs/webSocket";

157

import { retryWhen, delay, tap, take } from "rxjs/operators";

158

159

function createReconnectingWebSocket<T>(url: string, maxRetries: number = 5): WebSocketSubject<T> {

160

return webSocket<T>({

161

url,

162

openObserver: {

163

next: () => console.log('WebSocket connected')

164

},

165

closeObserver: {

166

next: () => console.log('WebSocket disconnected')

167

}

168

});

169

}

170

171

// Usage with retry logic

172

const socket$ = createReconnectingWebSocket<any>('ws://localhost:8080');

173

174

const messages$ = socket$.pipe(

175

retryWhen(errors =>

176

errors.pipe(

177

tap(err => console.log('Connection error, retrying...', err)),

178

delay(2000), // Wait 2 seconds before retry

179

take(5) // Maximum 5 retry attempts

180

)

181

)

182

);

183

184

messages$.subscribe(

185

message => console.log('Message:', message),

186

err => console.error('Final error:', err)

187

);

188

```

189

190

### Message Multiplexing

191

192

```typescript

193

import { webSocket } from "rxjs/webSocket";

194

import { filter, map } from "rxjs/operators";

195

196

interface WebSocketMessage {

197

type: string;

198

channel?: string;

199

data: any;

200

}

201

202

const socket$ = webSocket<WebSocketMessage>('ws://localhost:8080');

203

204

// Subscribe to different channels

205

const chatMessages$ = socket$.multiplex(

206

() => ({ type: 'subscribe', channel: 'chat' }),

207

() => ({ type: 'unsubscribe', channel: 'chat' }),

208

message => message.type === 'chat'

209

);

210

211

const notifications$ = socket$.multiplex(

212

() => ({ type: 'subscribe', channel: 'notifications' }),

213

() => ({ type: 'unsubscribe', channel: 'notifications' }),

214

message => message.type === 'notification'

215

);

216

217

const systemEvents$ = socket$.multiplex(

218

() => ({ type: 'subscribe', channel: 'system' }),

219

() => ({ type: 'unsubscribe', channel: 'system' }),

220

message => message.type === 'system'

221

);

222

223

// Handle different message types

224

chatMessages$.subscribe(msg => {

225

console.log('Chat message:', msg.data);

226

updateChatUI(msg.data);

227

});

228

229

notifications$.subscribe(msg => {

230

console.log('Notification:', msg.data);

231

showNotification(msg.data);

232

});

233

234

systemEvents$.subscribe(msg => {

235

console.log('System event:', msg.data);

236

handleSystemEvent(msg.data);

237

});

238

239

// Send messages to specific channels

240

function sendChatMessage(message: string) {

241

socket$.next({

242

type: 'chat',

243

channel: 'chat',

244

data: { message, timestamp: Date.now() }

245

});

246

}

247

```

248

249

### Custom Serialization

250

251

```typescript

252

import { webSocket } from "rxjs/webSocket";

253

254

interface CustomMessage {

255

id: string;

256

timestamp: number;

257

payload: any;

258

}

259

260

const socket$ = webSocket<CustomMessage>({

261

url: 'ws://localhost:8080',

262

263

// Custom serializer for outgoing messages

264

serializer: (msg: CustomMessage) => {

265

return JSON.stringify({

266

...msg,

267

timestamp: msg.timestamp || Date.now(),

268

id: msg.id || generateId()

269

});

270

},

271

272

// Custom deserializer for incoming messages

273

deserializer: (event: MessageEvent) => {

274

const data = JSON.parse(event.data);

275

return {

276

id: data.id,

277

timestamp: new Date(data.timestamp),

278

payload: data.payload

279

};

280

},

281

282

// Handle binary data

283

binaryType: 'arraybuffer'

284

});

285

286

function generateId(): string {

287

return Math.random().toString(36).substr(2, 9);

288

}

289

290

// Send structured message

291

socket$.next({

292

id: 'msg-001',

293

timestamp: Date.now(),

294

payload: { action: 'join_room', room: 'general' }

295

});

296

```

297

298

### WebSocket State Management

299

300

```typescript

301

import { webSocket } from "rxjs/webSocket";

302

import { BehaviorSubject, combineLatest } from "rxjs";

303

import { map, startWith, catchError } from "rxjs/operators";

304

305

class WebSocketService {

306

private socket$ = webSocket<any>('ws://localhost:8080');

307

private connectionState$ = new BehaviorSubject<'connecting' | 'connected' | 'disconnected' | 'error'>('connecting');

308

private reconnectAttempts$ = new BehaviorSubject<number>(0);

309

310

// Public state observables

311

readonly state$ = this.connectionState$.asObservable();

312

readonly connected$ = this.state$.pipe(map(state => state === 'connected'));

313

readonly reconnectCount$ = this.reconnectAttempts$.asObservable();

314

315

// Combined connection info

316

readonly connectionInfo$ = combineLatest([

317

this.state$,

318

this.reconnectCount$

319

]).pipe(

320

map(([state, attempts]) => ({ state, attempts }))

321

);

322

323

constructor() {

324

this.setupConnection();

325

}

326

327

private setupConnection() {

328

this.socket$.pipe(

329

tap(() => {

330

this.connectionState$.next('connected');

331

this.reconnectAttempts$.next(0);

332

}),

333

retryWhen(errors =>

334

errors.pipe(

335

tap(err => {

336

this.connectionState$.next('error');

337

const currentAttempts = this.reconnectAttempts$.value;

338

this.reconnectAttempts$.next(currentAttempts + 1);

339

}),

340

delay(2000),

341

take(10) // Max 10 reconnect attempts

342

)

343

),

344

catchError(err => {

345

this.connectionState$.next('disconnected');

346

console.error('WebSocket connection failed permanently:', err);

347

return EMPTY;

348

})

349

).subscribe();

350

}

351

352

send(message: any) {

353

if (this.connectionState$.value === 'connected') {

354

this.socket$.next(message);

355

} else {

356

console.warn('Cannot send message: WebSocket not connected');

357

}

358

}

359

360

getMessages() {

361

return this.socket$.asObservable();

362

}

363

364

disconnect() {

365

this.socket$.complete();

366

this.connectionState$.next('disconnected');

367

}

368

}

369

370

// Usage

371

const wsService = new WebSocketService();

372

373

// Monitor connection state

374

wsService.connectionInfo$.subscribe(({ state, attempts }) => {

375

console.log(`Connection state: ${state}, Attempts: ${attempts}`);

376

updateConnectionIndicator(state);

377

});

378

379

// Handle messages

380

wsService.getMessages().subscribe(message => {

381

console.log('Received message:', message);

382

processMessage(message);

383

});

384

385

// Send messages when connected

386

wsService.connected$.subscribe(connected => {

387

if (connected) {

388

wsService.send({ type: 'hello', data: 'Connected successfully' });

389

}

390

});

391

```

392

393

### Real-time Data Synchronization

394

395

```typescript

396

import { webSocket } from "rxjs/webSocket";

397

import { scan, shareReplay } from "rxjs/operators";

398

399

interface DataUpdate {

400

type: 'create' | 'update' | 'delete';

401

id: string;

402

data?: any;

403

}

404

405

class RealTimeDataService<T> {

406

private socket$ = webSocket<DataUpdate>('ws://localhost:8080');

407

408

// Maintain synchronized state

409

private data$ = this.socket$.pipe(

410

scan((state: Map<string, T>, update: DataUpdate) => {

411

const newState = new Map(state);

412

413

switch (update.type) {

414

case 'create':

415

case 'update':

416

newState.set(update.id, update.data);

417

break;

418

case 'delete':

419

newState.delete(update.id);

420

break;

421

}

422

423

return newState;

424

}, new Map<string, T>()),

425

shareReplay(1)

426

);

427

428

// Public API

429

getAllData() {

430

return this.data$.pipe(

431

map(dataMap => Array.from(dataMap.values()))

432

);

433

}

434

435

getItemById(id: string) {

436

return this.data$.pipe(

437

map(dataMap => dataMap.get(id)),

438

filter(item => item !== undefined)

439

);

440

}

441

442

create(data: T) {

443

this.socket$.next({

444

type: 'create',

445

id: generateId(),

446

data

447

});

448

}

449

450

update(id: string, data: Partial<T>) {

451

this.socket$.next({

452

type: 'update',

453

id,

454

data

455

});

456

}

457

458

delete(id: string) {

459

this.socket$.next({

460

type: 'delete',

461

id

462

});

463

}

464

}

465

466

// Usage for real-time user list

467

interface User {

468

id: string;

469

name: string;

470

status: 'online' | 'offline';

471

}

472

473

const userService = new RealTimeDataService<User>();

474

475

// Subscribe to real-time user updates

476

userService.getAllData().subscribe(users => {

477

console.log('Current users:', users);

478

updateUserList(users);

479

});

480

481

// Listen for specific user changes

482

userService.getItemById('user-123').subscribe(user => {

483

if (user) {

484

console.log('User 123 updated:', user);

485

updateUserProfile(user);

486

}

487

});

488

```

489

490

## Error Handling

491

492

```typescript

493

import { webSocket } from "rxjs/webSocket";

494

import { catchError, retry, tap } from "rxjs/operators";

495

496

const socket$ = webSocket({

497

url: 'ws://localhost:8080',

498

499

openObserver: {

500

next: () => console.log('WebSocket opened')

501

},

502

503

closeObserver: {

504

next: (event: CloseEvent) => {

505

console.log('WebSocket closed:', event.code, event.reason);

506

507

// Handle different close codes

508

if (event.code === 1006) {

509

console.log('Abnormal closure, likely network issue');

510

} else if (event.code === 1011) {

511

console.log('Server terminated connection due to error');

512

}

513

}

514

}

515

});

516

517

const messages$ = socket$.pipe(

518

tap(message => console.log('Message received:', message)),

519

catchError(err => {

520

console.error('WebSocket stream error:', err);

521

522

// Handle specific error types

523

if (err instanceof CloseEvent) {

524

console.log('Connection closed unexpectedly');

525

}

526

527

// Return empty or alternative stream

528

return EMPTY;

529

}),

530

retry(3) // Retry connection up to 3 times

531

);

532

533

messages$.subscribe({

534

next: message => handleMessage(message),

535

error: err => console.error('Subscription error:', err),

536

complete: () => console.log('WebSocket stream completed')

537

});

538

```

539

540

## Types

541

542

```typescript { .api }

543

interface Observer<T> {

544

next: (value: T) => void;

545

error?: (err: any) => void;

546

complete?: () => void;

547

}

548

549

interface AnonymousSubject<T> extends Observable<T> {

550

next(value: T): void;

551

error(err: any): void;

552

complete(): void;

553

}

554

```