0
# Subscriptions
1
2
GraphQL subscription support with WebSocket integration and subscription lifecycle management. This module enables real-time functionality in GraphQL APIs through WebSocket connections and pub/sub messaging patterns.
3
4
## Capabilities
5
6
### Subscription Service
7
8
Core service for managing GraphQL subscriptions and WebSocket connections.
9
10
```typescript { .api }
11
/**
12
* Injectable service for managing GraphQL subscriptions
13
* Handles WebSocket connections, subscription lifecycle, and pub/sub messaging
14
*/
15
export class GqlSubscriptionService {
16
/**
17
* Start subscription server with WebSocket support
18
* @param options - Subscription server configuration
19
* @returns Promise that resolves when server is started
20
*/
21
start(options: SubscriptionServerOptions): Promise<void>;
22
23
/**
24
* Stop subscription server and close all connections
25
* @returns Promise that resolves when server is stopped
26
*/
27
stop(): Promise<void>;
28
29
/**
30
* Get active subscription count
31
* @returns Number of active subscriptions
32
*/
33
getActiveSubscriptionCount(): number;
34
35
/**
36
* Publish message to subscription channel
37
* @param channel - Channel name to publish to
38
* @param payload - Data to publish
39
* @returns Promise that resolves when message is published
40
*/
41
publish(channel: string, payload: any): Promise<void>;
42
43
/**
44
* Subscribe to channel for updates
45
* @param channel - Channel name to subscribe to
46
* @returns Async iterator for subscription events
47
*/
48
subscribe(channel: string): AsyncIterator<any>;
49
50
/**
51
* Unsubscribe from channel
52
* @param channel - Channel name to unsubscribe from
53
* @param subscriptionId - Unique subscription identifier
54
* @returns Promise that resolves when unsubscribed
55
*/
56
unsubscribe(channel: string, subscriptionId: string): Promise<void>;
57
}
58
```
59
60
**Usage Examples:**
61
62
```typescript
63
import { Injectable } from "@nestjs/common";
64
import { GqlSubscriptionService } from "@nestjs/graphql";
65
import { PubSub } from "graphql-subscriptions";
66
67
@Injectable()
68
export class NotificationService {
69
private pubSub = new PubSub();
70
71
constructor(private subscriptionService: GqlSubscriptionService) {}
72
73
// Publish notification
74
async publishNotification(userId: string, message: string): Promise<void> {
75
await this.subscriptionService.publish(`user_${userId}`, {
76
notification: { message, timestamp: new Date() }
77
});
78
}
79
80
// Subscribe to user notifications
81
subscribeToUserNotifications(userId: string): AsyncIterator<any> {
82
return this.subscriptionService.subscribe(`user_${userId}`);
83
}
84
}
85
86
// Subscription resolver
87
@Resolver()
88
export class SubscriptionResolver {
89
constructor(private notificationService: NotificationService) {}
90
91
@Subscription(() => Notification)
92
notifications(@Args('userId') userId: string): AsyncIterator<Notification> {
93
return this.notificationService.subscribeToUserNotifications(userId);
94
}
95
96
@Subscription(() => String, {
97
filter: (payload, variables) => {
98
return payload.userId === variables.userId;
99
},
100
})
101
messageAdded(@Args('userId') userId: string): AsyncIterator<string> {
102
return this.pubSub.asyncIterator('messageAdded');
103
}
104
}
105
```
106
107
### Subscription Configuration
108
109
Configuration interfaces and types for setting up GraphQL subscriptions.
110
111
```typescript { .api }
112
/**
113
* General subscription configuration
114
* Union type supporting multiple WebSocket transport libraries
115
*/
116
type SubscriptionConfig =
117
| GraphQLWsSubscriptionsConfig
118
| GraphQLSubscriptionTransportWsConfig
119
| boolean;
120
121
/**
122
* Configuration for graphql-ws WebSocket subscriptions (recommended)
123
* Modern WebSocket transport with improved performance and features
124
*/
125
interface GraphQLWsSubscriptionsConfig {
126
/** WebSocket server host */
127
host?: string;
128
/** WebSocket server port */
129
port?: number;
130
/** WebSocket endpoint path */
131
path?: string;
132
/** Connection initialization options */
133
connectionInitWaitTimeout?: number;
134
/** Keep-alive interval in milliseconds */
135
keepAlive?: number;
136
/** Custom context function for subscriptions */
137
context?: (ctx: any) => any;
138
/** Connection callback functions */
139
onConnect?: (connectionParams: any) => any;
140
onDisconnect?: (websocket: any, context: any) => any;
141
/** Custom schema for subscriptions */
142
schema?: GraphQLSchema;
143
/** Enable introspection for subscriptions */
144
introspection?: boolean;
145
}
146
147
/**
148
* Configuration for subscriptions-transport-ws (legacy)
149
* Legacy WebSocket transport, use graphql-ws for new projects
150
*/
151
interface GraphQLSubscriptionTransportWsConfig {
152
/** WebSocket server host */
153
host?: string;
154
/** WebSocket server port */
155
port?: number;
156
/** WebSocket endpoint path */
157
path?: string;
158
/** Connection timeout in milliseconds */
159
timeout?: number;
160
/** Keep-alive interval */
161
keepAlive?: number;
162
/** Connection callback functions */
163
onConnect?: (connectionParams: any, websocket: any, context: any) => any;
164
onDisconnect?: (websocket: any, context: any) => any;
165
onOperation?: (message: any, params: any, websocket: any) => any;
166
onOperationComplete?: (websocket: any, opId: string) => any;
167
}
168
169
/**
170
* Subscription server configuration options
171
*/
172
interface SubscriptionServerOptions {
173
/** HTTP server instance to attach WebSocket server to */
174
server: any;
175
/** GraphQL schema for subscriptions */
176
schema: GraphQLSchema;
177
/** Subscription configuration */
178
subscriptions: SubscriptionConfig;
179
/** GraphQL context function */
180
context?: (ctx: any) => any;
181
}
182
```
183
184
**Configuration Examples:**
185
186
```typescript
187
import { Module } from "@nestjs/common";
188
import { GraphQLModule } from "@nestjs/graphql";
189
import { ApolloDriver, ApolloDriverConfig } from "@nestjs/apollo";
190
191
// Modern graphql-ws configuration
192
@Module({
193
imports: [
194
GraphQLModule.forRoot<ApolloDriverConfig>({
195
driver: ApolloDriver,
196
autoSchemaFile: true,
197
installSubscriptionHandlers: true,
198
subscriptions: {
199
'graphql-ws': {
200
path: '/graphql',
201
onConnect: (connectionParams) => {
202
console.log('Client connected:', connectionParams);
203
return { user: connectionParams.user };
204
},
205
onDisconnect: () => {
206
console.log('Client disconnected');
207
},
208
context: ({ connectionParams }) => ({
209
user: connectionParams?.user,
210
}),
211
},
212
},
213
}),
214
],
215
})
216
export class SubscriptionModule {}
217
218
// Legacy subscriptions-transport-ws configuration
219
@Module({
220
imports: [
221
GraphQLModule.forRoot<ApolloDriverConfig>({
222
driver: ApolloDriver,
223
autoSchemaFile: true,
224
installSubscriptionHandlers: true,
225
subscriptions: {
226
'subscriptions-transport-ws': {
227
path: '/graphql',
228
onConnect: (connectionParams, websocket, context) => {
229
return { user: connectionParams.user };
230
},
231
keepAlive: 30000,
232
},
233
},
234
}),
235
],
236
})
237
export class LegacySubscriptionModule {}
238
```
239
240
### Subscription Decorators and Resolvers
241
242
Decorators and utilities for creating subscription resolvers.
243
244
```typescript { .api }
245
/**
246
* Marks a resolver method as a GraphQL Subscription
247
* @param name - Optional subscription name (defaults to method name)
248
* @param options - Configuration options for the subscription
249
*/
250
function Subscription(name?: string, options?: SubscriptionOptions): MethodDecorator;
251
252
/**
253
* Options for subscription resolvers
254
*/
255
interface SubscriptionOptions {
256
/** Custom name for the subscription */
257
name?: string;
258
/** Description for the subscription */
259
description?: string;
260
/** Deprecation reason */
261
deprecationReason?: string;
262
/** Whether return type can be null */
263
nullable?: boolean;
264
/** Filter function to determine which updates to send */
265
filter?: (payload: any, variables: any, context: any) => boolean | Promise<boolean>;
266
/** Resolver function to transform subscription payload */
267
resolve?: (payload: any, args: any, context: any, info: any) => any;
268
}
269
```
270
271
**Subscription Resolver Examples:**
272
273
```typescript
274
import { Resolver, Subscription, Args, Context } from "@nestjs/graphql";
275
import { PubSub } from "graphql-subscriptions";
276
277
@Resolver()
278
export class ChatSubscriptionResolver {
279
private pubSub = new PubSub();
280
281
constructor(private chatService: ChatService) {}
282
283
// Basic subscription
284
@Subscription(() => Message)
285
messageAdded(): AsyncIterator<Message> {
286
return this.pubSub.asyncIterator('messageAdded');
287
}
288
289
// Subscription with filtering
290
@Subscription(() => Message, {
291
filter: (payload, variables, context) => {
292
// Only send messages from the specified room
293
return payload.messageAdded.roomId === variables.roomId;
294
},
295
})
296
messageAddedToRoom(@Args('roomId') roomId: string): AsyncIterator<Message> {
297
return this.pubSub.asyncIterator('messageAdded');
298
}
299
300
// Subscription with payload transformation
301
@Subscription(() => Notification, {
302
resolve: (payload, args, context) => {
303
// Transform the payload before sending to client
304
return {
305
id: payload.id,
306
message: payload.message,
307
user: context.user,
308
timestamp: new Date(),
309
};
310
},
311
})
312
userNotifications(@Context() context: any): AsyncIterator<Notification> {
313
const userId = context.user.id;
314
return this.pubSub.asyncIterator(`userNotifications_${userId}`);
315
}
316
317
// Subscription with authentication
318
@Subscription(() => PrivateMessage, {
319
filter: (payload, variables, context) => {
320
// Ensure user is authenticated and authorized
321
if (!context.user) return false;
322
return payload.recipientId === context.user.id;
323
},
324
})
325
privateMessageReceived(@Context() context: any): AsyncIterator<PrivateMessage> {
326
return this.pubSub.asyncIterator('privateMessage');
327
}
328
329
// Trigger subscription from mutation
330
@Mutation(() => Message)
331
async sendMessage(@Args('input') input: SendMessageInput): Promise<Message> {
332
const message = await this.chatService.createMessage(input);
333
334
// Trigger subscription
335
await this.pubSub.publish('messageAdded', { messageAdded: message });
336
337
return message;
338
}
339
}
340
```
341
342
### Pub/Sub Integration
343
344
Integration with various pub/sub systems for scalable subscription handling.
345
346
```typescript { .api }
347
/**
348
* Interface for pub/sub system integration
349
*/
350
interface PubSubEngine {
351
/** Publish message to channel */
352
publish(channel: string, payload: any): Promise<void>;
353
/** Subscribe to channel */
354
subscribe(channel: string, onMessage: (message: any) => void): Promise<number>;
355
/** Unsubscribe from channel */
356
unsubscribe(subId: number): void;
357
/** Get async iterator for channel */
358
asyncIterator<T>(triggers: string | string[]): AsyncIterator<T>;
359
}
360
361
/**
362
* Redis pub/sub configuration
363
*/
364
interface RedisPubSubOptions {
365
/** Redis connection options */
366
connection?: {
367
host: string;
368
port: number;
369
password?: string;
370
db?: number;
371
};
372
/** Custom Redis client */
373
publisher?: any;
374
subscriber?: any;
375
/** Message serialization options */
376
serializer?: {
377
serialize: (value: any) => string;
378
deserialize: (value: string) => any;
379
};
380
}
381
```
382
383
**Pub/Sub Examples:**
384
385
```typescript
386
import { Module } from "@nestjs/common";
387
import { RedisPubSub } from "graphql-redis-subscriptions";
388
import { PubSub } from "graphql-subscriptions";
389
import Redis from "ioredis";
390
391
// In-memory pub/sub (development only)
392
@Module({
393
providers: [
394
{
395
provide: 'PUB_SUB',
396
useValue: new PubSub(),
397
},
398
],
399
exports: ['PUB_SUB'],
400
})
401
export class PubSubModule {}
402
403
// Redis pub/sub (production)
404
@Module({
405
providers: [
406
{
407
provide: 'PUB_SUB',
408
useFactory: () => {
409
return new RedisPubSub({
410
publisher: new Redis({
411
host: 'localhost',
412
port: 6379,
413
}),
414
subscriber: new Redis({
415
host: 'localhost',
416
port: 6379,
417
}),
418
});
419
},
420
},
421
],
422
exports: ['PUB_SUB'],
423
})
424
export class RedisPubSubModule {}
425
426
// Using pub/sub in resolver
427
@Resolver()
428
export class NotificationResolver {
429
constructor(@Inject('PUB_SUB') private pubSub: PubSubEngine) {}
430
431
@Subscription(() => Notification)
432
notifications(): AsyncIterator<Notification> {
433
return this.pubSub.asyncIterator('notifications');
434
}
435
436
@Mutation(() => Boolean)
437
async sendNotification(@Args('message') message: string): Promise<boolean> {
438
await this.pubSub.publish('notifications', {
439
notifications: { message, timestamp: new Date() }
440
});
441
return true;
442
}
443
}
444
```
445
446
### Subscription Authentication and Authorization
447
448
Security patterns for subscription endpoints.
449
450
```typescript { .api }
451
/**
452
* Subscription authentication context
453
*/
454
interface SubscriptionContext {
455
/** WebSocket connection */
456
connection: any;
457
/** Connection parameters from client */
458
connectionParams: any;
459
/** Authenticated user */
460
user?: any;
461
/** Additional context data */
462
[key: string]: any;
463
}
464
465
/**
466
* Subscription guard interface
467
*/
468
interface SubscriptionGuard {
469
canActivate(context: SubscriptionContext): boolean | Promise<boolean>;
470
}
471
```
472
473
**Authentication Examples:**
474
475
```typescript
476
import { CanActivate, Injectable, ExecutionContext } from "@nestjs/common";
477
import { GqlExecutionContext } from "@nestjs/graphql";
478
479
// Subscription authentication guard
480
@Injectable()
481
export class SubscriptionAuthGuard implements CanActivate {
482
canActivate(context: ExecutionContext): boolean {
483
const gqlContext = GqlExecutionContext.create(context);
484
const ctx = gqlContext.getContext();
485
486
// Check if user is authenticated
487
return !!ctx.user;
488
}
489
}
490
491
// Subscription with authentication
492
@Resolver()
493
@UseGuards(SubscriptionAuthGuard)
494
export class SecureSubscriptionResolver {
495
@Subscription(() => PrivateMessage)
496
privateMessages(@Context() context: SubscriptionContext): AsyncIterator<PrivateMessage> {
497
const userId = context.user.id;
498
return this.pubSub.asyncIterator(`privateMessages_${userId}`);
499
}
500
}
501
502
// Module configuration with authentication
503
@Module({
504
imports: [
505
GraphQLModule.forRoot<ApolloDriverConfig>({
506
driver: ApolloDriver,
507
autoSchemaFile: true,
508
subscriptions: {
509
'graphql-ws': {
510
onConnect: async (connectionParams) => {
511
// Authenticate connection
512
const token = connectionParams.authorization;
513
if (!token) throw new Error('Missing auth token');
514
515
const user = await this.authService.validateToken(token);
516
if (!user) throw new Error('Invalid auth token');
517
518
return { user };
519
},
520
context: ({ connectionParams }) => ({
521
user: connectionParams.user,
522
}),
523
},
524
},
525
}),
526
],
527
})
528
export class AuthenticatedSubscriptionModule {}
529
```
530
531
### Error Handling in Subscriptions
532
533
Best practices for handling errors in subscription resolvers.
534
535
```typescript { .api }
536
/**
537
* Subscription error handling patterns
538
*/
539
interface SubscriptionErrorHandler {
540
/** Handle subscription errors */
541
handleError(error: Error, context: SubscriptionContext): any;
542
/** Handle connection errors */
543
handleConnectionError(error: Error, connectionParams: any): any;
544
}
545
```
546
547
**Error Handling Examples:**
548
549
```typescript
550
@Resolver()
551
export class RobustSubscriptionResolver {
552
@Subscription(() => Message, {
553
filter: async (payload, variables, context) => {
554
try {
555
// Potentially failing filter logic
556
const hasPermission = await this.checkPermission(context.user, variables.roomId);
557
return hasPermission;
558
} catch (error) {
559
console.error('Subscription filter error:', error);
560
return false; // Safely exclude on error
561
}
562
},
563
resolve: (payload, args, context) => {
564
try {
565
// Transform payload with error handling
566
return this.transformMessage(payload.messageAdded);
567
} catch (error) {
568
console.error('Subscription resolve error:', error);
569
return null; // Return null on transformation error
570
}
571
},
572
})
573
roomMessages(@Args('roomId') roomId: string): AsyncIterator<Message> {
574
return this.pubSub.asyncIterator(`room_${roomId}`);
575
}
576
}
577
```