GraphQL over WebSocket server implementation providing protocol-compliant WebSocket handling with extensive customization hooks, execution control, and context management.
Creates a protocol-compliant WebSocket GraphQL server with comprehensive configuration options.
/**
* Creates a protocol compliant WebSocket GraphQL server
* @param options - Server configuration and hooks
* @returns Server instance for handling WebSocket connections
*/
function makeServer<P = Record<string, unknown>, E = unknown>(
options: ServerOptions<P, E>
): Server<E>;
interface ServerOptions<P = Record<string, unknown>, E = unknown> {
/** GraphQL schema or function returning schema */
schema?:
| GraphQLSchema
| ((ctx: Context<P, E>, id: ID, payload: SubscribePayload, args: ExecutionArgs) => GraphQLSchema);
/** Execution context value or function returning it */
context?:
| GraphQLExecutionContextValue
| ((ctx: Context<P, E>, id: ID, payload: SubscribePayload, args: ExecutionArgs) => GraphQLExecutionContextValue);
/** Root resolvers organized by operation type */
roots?: {
query?: Record<string, NonNullable<ExecutionArgs['rootValue']>>;
mutation?: Record<string, NonNullable<ExecutionArgs['rootValue']>>;
subscription?: Record<string, NonNullable<ExecutionArgs['rootValue']>>;
};
/** Custom validation function */
validate?: typeof graphqlValidate;
/** Custom execution function for queries and mutations */
execute?: (args: ExecutionArgs) => OperationResult;
/** Custom subscription function */
subscribe?: (args: ExecutionArgs) => OperationResult;
/** Connection initialization timeout in milliseconds */
connectionInitWaitTimeout?: number;
/** Connection establishment hook */
onConnect?: (ctx: Context<P, E>) => Promise<Record<string, unknown> | boolean | void> | Record<string, unknown> | boolean | void;
/** Connection close hook */
onDisconnect?: (ctx: Context<P, E>, code?: number, reason?: string) => Promise<void> | void;
/** Socket close hook */
onClose?: (ctx: Context<P, E>, code?: number, reason?: string) => Promise<void> | void;
/** Subscription request hook */
onSubscribe?: (ctx: Context<P, E>, id: ID, payload: SubscribePayload) => Promise<ExecutionArgs | readonly GraphQLError[] | void> | ExecutionArgs | readonly GraphQLError[] | void;
/** Operation execution hook */
onOperation?: (ctx: Context<P, E>, id: ID, payload: SubscribePayload, args: ExecutionArgs, result: OperationResult) => Promise<OperationResult | void> | OperationResult | void;
/** Error handling hook */
onError?: (ctx: Context<P, E>, id: ID, payload: SubscribePayload, errors: readonly GraphQLError[]) => Promise<readonly GraphQLFormattedError[] | void> | readonly GraphQLFormattedError[] | void;
/** Result streaming hook */
onNext?: (ctx: Context<P, E>, id: ID, payload: SubscribePayload, args: ExecutionArgs, result: FormattedExecutionResult) => Promise<FormattedExecutionResult | void> | FormattedExecutionResult | void;
/** Subscription completion hook */
onComplete?: (ctx: Context<P, E>, id: ID, payload: SubscribePayload) => Promise<void> | void;
/** JSON message reviver for parsing */
jsonMessageReviver?: JSONMessageReviver;
/** JSON message replacer for stringifying */
jsonMessageReplacer?: JSONMessageReplacer;
}Usage Examples:
import { makeServer } from "graphql-ws";
import { buildSchema } from "graphql";
// Basic server
const server = makeServer({
schema: buildSchema(`
type Query {
hello: String
}
type Subscription {
messageAdded: String
}
`),
});
// Server with hooks
const serverWithHooks = makeServer({
schema: mySchema,
onConnect: async (ctx) => {
const token = ctx.connectionParams?.authToken;
if (!token) {
throw new Error("Unauthorized");
}
const user = await validateToken(token);
return { user };
},
onSubscribe: async (ctx, id, payload) => {
// Custom validation or authorization
if (!ctx.extra.user) {
return [new GraphQLError("Authentication required")];
}
return {
schema: mySchema,
document: parse(payload.query),
variableValues: payload.variables,
contextValue: { user: ctx.extra.user },
};
},
onError: async (ctx, id, payload, errors) => {
console.error("GraphQL errors:", errors);
return errors.map(formatError);
},
});Main server interface for handling WebSocket connections.
interface Server<E = unknown> {
/**
* Handle new WebSocket connection
* @param socket - WebSocket connection abstraction
* @param ctxExtra - Additional context data
* @returns Function to call when connection closes
*/
opened(
socket: WebSocket,
ctxExtra: E
): (code?: number, reason?: string) => Promise<void>;
}Usage Examples:
import { WebSocketServer } from "ws";
const wsServer = new WebSocketServer({ port: 4000, path: "/graphql" });
wsServer.on("connection", (socket, request) => {
const closed = server.opened(socket, {
socket,
request,
// Additional context data
ip: request.socket.remoteAddress,
userAgent: request.headers["user-agent"],
});
socket.once("close", closed);
});Protocol-agnostic WebSocket interface that adapters must implement.
interface WebSocket {
/** WebSocket subprotocol */
readonly protocol: string;
/** Send message to client */
send(data: string): Promise<void> | void;
/** Close connection with optional code and reason */
close(code?: number, reason?: string): Promise<void> | void;
/** Register message handler */
onMessage(cb: (data: string) => Promise<void> | void): void;
/** Optional ping handler */
onPing?(cb: (payload: Buffer) => Promise<void> | void): void;
/** Optional pong handler */
onPong?(cb: (payload: Buffer) => Promise<void> | void): void;
}Execution context maintained for each WebSocket connection.
interface Context<P = Record<string, unknown>, E = unknown> {
/** Whether connection_init message was received */
readonly connectionInitReceived: boolean;
/** Whether connection was acknowledged */
readonly acknowledged: boolean;
/** Connection parameters from client */
readonly connectionParams?: Readonly<P>;
/** Active subscriptions by ID */
readonly subscriptions: Record<ID, AsyncGenerator<unknown> | AsyncIterable<unknown> | null>;
/** Additional context data */
readonly extra: E;
}Usage Examples:
const server = makeServer({
schema: mySchema,
onSubscribe: (ctx, id, payload) => {
// Access connection state
if (!ctx.acknowledged) {
return [new GraphQLError("Connection not acknowledged")];
}
// Access connection parameters
const userId = ctx.connectionParams?.userId;
// Access extra context
const clientIP = ctx.extra.ip;
return {
schema: mySchema,
document: parse(payload.query),
contextValue: { userId, clientIP },
};
},
});Type definitions for GraphQL operation results.
type OperationResult =
| Promise<
| AsyncGenerator<ExecutionResult>
| AsyncIterable<ExecutionResult>
| ExecutionResult
>
| AsyncGenerator<ExecutionResult>
| AsyncIterable<ExecutionResult>
| ExecutionResult;
type GraphQLExecutionContextValue =
| { [key: string]: unknown }
| symbol
| number
| string
| boolean
| undefined
| null;Utility function for WebSocket subprotocol selection.
/**
* Helper for choosing the correct subprotocol from client proposals
* @param protocols - Client-proposed protocols
* @returns Selected protocol or false if none acceptable
*/
function handleProtocols(
protocols: Set<string> | string[] | string
): typeof GRAPHQL_TRANSPORT_WS_PROTOCOL | false;Usage Examples:
import { createServer } from "http";
import { WebSocketServer } from "ws";
import { handleProtocols } from "graphql-ws";
const httpServer = createServer();
const wsServer = new WebSocketServer({
server: httpServer,
handleProtocols: (protocols) => {
return handleProtocols(protocols);
},
});const server = makeServer({
schema: (ctx, id, payload) => {
// Return different schema based on context
if (ctx.connectionParams?.role === "admin") {
return adminSchema;
}
return publicSchema;
},
});const server = makeServer({
schema: mySchema,
execute: async (args) => {
const cacheKey = generateCacheKey(args);
const cached = await cache.get(cacheKey);
if (cached) return cached;
const result = await graphqlExecute(args);
await cache.set(cacheKey, result);
return result;
},
});const server = makeServer({
schema: mySchema,
onNext: async (ctx, id, payload, args, result) => {
// Filter subscription results based on user permissions
const user = ctx.extra.user;
if (result.data?.messageAdded?.private && !canAccessPrivateMessages(user)) {
return null; // Skip this result
}
return result;
},
});const server = makeServer({
schema: mySchema,
onConnect: async (ctx) => {
console.log("Client connecting:", ctx.extra.ip);
await logConnection(ctx.extra.ip);
return true;
},
onDisconnect: async (ctx, code, reason) => {
console.log("Client disconnected:", code, reason);
await logDisconnection(ctx.extra.ip, code, reason);
},
onClose: async (ctx, code, reason) => {
// Cleanup resources
await cleanupUserSessions(ctx.connectionParams?.userId);
},
});