WebSocket adapter implementation for NestJS framework providing real-time bidirectional communication
tessl install tessl/npm-nestjs--platform-ws@11.1.0NestJS Platform WebSocket (@nestjs/platform-ws) is a WebSocket adapter implementation for the NestJS framework that provides real-time bidirectional communication between server and client applications. It uses the 'ws' library as the underlying WebSocket implementation and integrates seamlessly with NestJS's dependency injection system, decorators, and reactive programming model using RxJS.
npm install @nestjs/platform-wsimport { WsAdapter } from "@nestjs/platform-ws";For CommonJS:
const { WsAdapter } = require("@nestjs/platform-ws");import { WsAdapter } from "@nestjs/platform-ws";
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// Use the WebSocket adapter
app.useWebSocketAdapter(new WsAdapter(app));
await app.listen(3000);
}
bootstrap();The @nestjs/platform-ws package is built around the following key components:
Core WebSocket adapter class that provides complete WebSocket functionality for NestJS applications.
/**
* WebSocket adapter implementation for NestJS using the 'ws' library
* @publicApi
*/
class WsAdapter extends AbstractWsAdapter {
constructor(
appOrHttpServer?: INestApplicationContext | object,
options?: WsAdapterOptions
);
/** Creates a WebSocket server with specified configuration */
create(
port: number,
options?: Record<string, any> & {
namespace?: string;
server?: any;
path?: string;
}
): any;
/** Sets up message handling for WebSocket client connections */
bindMessageHandlers(
client: any,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>
): void;
/** Handles individual message processing and routing */
bindMessageHandler(
buffer: any,
handlersMap: Map<string, MessageMappingProperties>,
transform: (data: any) => Observable<any>
): Observable<any>;
/** Attaches error event handlers to WebSocket server */
bindErrorHandler(server: any): any;
/** Binds disconnect event handler to WebSocket client */
bindClientDisconnect(client: any, callback: Function): void;
/** Gracefully closes WebSocket server and terminates client connections */
close(server: any): Promise<void>;
/** Cleans up all HTTP and WebSocket server registries */
dispose(): Promise<void>;
/** Sets custom message parser for processing incoming messages */
setMessageParser(parser: WsMessageParser): void;
/** Protected method to ensure HTTP server exists for the given port */
protected ensureHttpServerExists(
port: number,
httpServer?: any
): any;
/** Protected method to add WebSocket server to internal registry */
protected addWsServerToRegistry<T extends Record<'path', string>>(
wsServer: T,
port: number,
path: string
): void;
}Custom message parser support for handling different message formats.
/**
* Message parser function type for processing WebSocket data
*/
type WsMessageParser = (data: WsData) => { event: string; data: any } | void;
/** WebSocket data types supported by the parser */
type WsData = string | Buffer | ArrayBuffer | Buffer[];Adapter configuration options for customizing behavior.
interface WsAdapterOptions {
/** Custom message parser for handling different message formats */
messageParser?: WsMessageParser;
}WebSocket connection ready states used internally by the adapter.
enum READY_STATE {
CONNECTING_STATE = 0,
OPEN_STATE = 1,
CLOSING_STATE = 2,
CLOSED_STATE = 3
}import { WsAdapter } from "@nestjs/platform-ws";
import { NestFactory } from "@nestjs/core";
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));import { WsAdapter } from "@nestjs/platform-ws";
const customParser = (data) => {
try {
// Custom parsing logic
const parsed = JSON.parse(data.toString());
return { event: parsed.type, data: parsed.payload };
} catch {
return null;
}
};
const adapter = new WsAdapter(app, { messageParser: customParser });
app.useWebSocketAdapter(adapter);// In your WebSocket gateway
@WebSocketGateway(3001, { path: '/custom-ws' })
export class CustomGateway {
@WebSocketServer()
server: Server;
// The adapter automatically handles path-based routing
}// Multiple gateways can share the same HTTP server
@WebSocketGateway(3001, { path: '/chat' })
export class ChatGateway { }
@WebSocketGateway(3001, { path: '/notifications' })
export class NotificationGateway { }
// The WsAdapter manages multiple servers with different paths@WebSocketGateway()
export class MyGateway implements OnGatewayConnection, OnGatewayDisconnect {
handleConnection(client: WebSocket) {
console.log('Client connected');
// Connection handling
}
handleDisconnect(client: WebSocket) {
console.log('Client disconnected');
// Cleanup logic
}
@SubscribeMessage('error')
handleError(client: WebSocket, error: any) {
console.error('WebSocket error:', error);
// Error handling
}
}import { WsAdapter } from "@nestjs/platform-ws";
import { INestApplication } from "@nestjs/common";
// Custom adapter with advanced configuration
class CustomWsAdapter extends WsAdapter {
constructor(app: INestApplication) {
// Custom message parser that handles binary data
const binaryParser = (data: Buffer | string) => {
if (Buffer.isBuffer(data)) {
// Handle binary WebSocket frames
const eventLength = data.readUInt8(0);
const event = data.subarray(1, 1 + eventLength).toString();
const payload = data.subarray(1 + eventLength);
return { event, data: payload };
}
// Fallback to JSON parsing for text frames
return JSON.parse(data.toString());
};
super(app, { messageParser: binaryParser });
}
// Override error handling for custom logging
bindErrorHandler(server: any) {
server.on('connection', (ws: any) => {
ws.on('error', (err: any) => {
// Custom error handling logic
this.logger.error(`WebSocket error: ${err.message}`, err.stack);
});
});
server.on('error', (err: any) => {
this.logger.error(`Server error: ${err.message}`, err.stack);
});
return server;
}
}
// Use custom adapter
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new CustomWsAdapter(app));// Gateway with connection pooling and rate limiting
@WebSocketGateway(3001, {
cors: true,
transports: ['websocket'],
})
export class OptimizedGateway {
private connectionPool = new Map<string, WebSocket>();
private messageCount = new Map<string, number>();
@OnEvent('server.created')
handleServerCreated(server: any) {
// Configure server-level options after creation
server.options.perMessageDeflate = {
threshold: 1024,
concurrencyLimit: 10,
};
}
handleConnection(client: WebSocket, request: any) {
const clientId = this.generateClientId(request);
// Add to connection pool
this.connectionPool.set(clientId, client);
this.messageCount.set(clientId, 0);
// Set up rate limiting
const rateLimit = setInterval(() => {
this.messageCount.set(clientId, 0);
}, 60000); // Reset every minute
client.on('close', () => {
clearInterval(rateLimit);
this.connectionPool.delete(clientId);
this.messageCount.delete(clientId);
});
}
@SubscribeMessage('message')
handleMessage(client: WebSocket, data: any) {
const clientId = this.getClientId(client);
const count = this.messageCount.get(clientId) || 0;
// Rate limiting
if (count > 100) { // Max 100 messages per minute
client.send(JSON.stringify({
error: 'Rate limit exceeded'
}));
return;
}
this.messageCount.set(clientId, count + 1);
// Process message...
}
private generateClientId(request: any): string {
return `${request.socket.remoteAddress}:${Date.now()}`;
}
private getClientId(client: WebSocket): string {
// Implementation to retrieve client ID
return '';
}
}/** HTTP server registry key type - uses port number as key */
type HttpServerRegistryKey = number;
/** HTTP server registry entry type - stores HTTP server instances */
type HttpServerRegistryEntry = any;
/** WebSocket server registry key type - uses port number as key */
type WsServerRegistryKey = number;
/** WebSocket server registry entry type - array of WebSocket servers */
type WsServerRegistryEntry = any[];
/** Port constant used for underlying HTTP server sharing */
const UNDERLYING_HTTP_SERVER_PORT = 0;The adapter maintains internal registries for HTTP and WebSocket servers:
/** Map storing HTTP servers by port number */
protected readonly httpServersRegistry: Map<HttpServerRegistryKey, HttpServerRegistryEntry>;
/** Map storing WebSocket servers by port number */
protected readonly wsServersRegistry: Map<WsServerRegistryKey, WsServerRegistryEntry>;The adapter provides comprehensive error handling:
// Namespace error (thrown during server creation)
const error = new Error(
'"WsAdapter" does not support namespaces. If you need namespaces in your project, consider using the "@nestjs/platform-socket.io" package instead.'
);
// Message parsing error (returns EMPTY observable)
try {
const message = this.messageParser(buffer.data);
if (!message) {
return EMPTY; // Invalid message format
}
} catch {
return EMPTY; // Parsing failed
}
// HTTP upgrade error handling
httpServer.on('upgrade', (request, socket, head) => {
try {
// URL parsing and routing logic
} catch (err) {
socket.end('HTTP/1.1 400\r\n' + err.message);
}
});