GraphQL subscription support with built-in PubSub implementation, event streaming, and real-time data flow utilities. Enables Server-Sent Events (SSE) and WebSocket subscriptions with filtering and transformation operators.
Creates a publisher-subscriber instance for managing subscription events and real-time data distribution.
/**
* Creates a PubSub instance for managing subscription events
* @returns PubSub instance with publish/subscribe capabilities
*/
function createPubSub(): PubSub;
interface PubSub {
/** Publish an event to all subscribers of a topic */
publish<T = any>(topic: string, payload: T): void;
/** Subscribe to events from a specific topic */
subscribe<T = any>(topic: string): AsyncIterableIterator<T>;
/** Subscribe to events from multiple topics */
subscribe<T = any>(...topics: string[]): AsyncIterableIterator<T>;
}
interface PubSubEvent<T = any> {
topic: string;
payload: T;
}
interface PubSubEventTarget {
addEventListener(type: string, listener: EventListener): void;
removeEventListener(type: string, listener: EventListener): void;
dispatchEvent(event: Event): boolean;
}Usage Example:
import { createPubSub } from 'graphql-yoga';
const pubsub = createPubSub();
// Publish events
pubsub.publish('POST_ADDED', {
id: '1',
title: 'New Post',
content: 'Hello World'
});
// Subscribe in resolvers
const resolvers = {
Subscription: {
postAdded: {
subscribe: () => pubsub.subscribe('POST_ADDED'),
resolve: (payload) => payload,
},
},
};Filters subscription streams based on custom predicates for selective event delivery.
/**
* Filters an async iterable based on a predicate function
* @param predicate - Function that returns true for items to keep
* @returns Filtered async iterable
*/
function filter<T>(
predicate: (value: T) => boolean | Promise<boolean>
): (source: AsyncIterable<T>) => AsyncIterable<T>;Usage Example:
import { createPubSub, filter, pipe } from 'graphql-yoga';
const pubsub = createPubSub();
const resolvers = {
Subscription: {
postsByAuthor: {
subscribe: (_, { authorId }) =>
pipe(
pubsub.subscribe('POST_ADDED'),
filter((post) => post.authorId === authorId)
),
resolve: (payload) => payload,
},
},
};Transforms values in subscription streams using mapping functions.
/**
* Maps values in an async iterable using a transformation function
* @param mapper - Function to transform each value
* @returns Mapped async iterable
*/
function map<T, U>(
mapper: (value: T) => U | Promise<U>
): (source: AsyncIterable<T>) => AsyncIterable<U>;Usage Example:
import { createPubSub, map, pipe } from 'graphql-yoga';
const pubsub = createPubSub();
const resolvers = {
Subscription: {
postTitles: {
subscribe: () =>
pipe(
pubsub.subscribe('POST_ADDED'),
map((post) => ({ title: post.title.toUpperCase() }))
),
resolve: (payload) => payload,
},
},
};Utility function for composing async iterable operators in a pipeline.
/**
* Composes multiple async iterable operators into a pipeline
* @param source - Source async iterable
* @param operators - Array of transformation operators
* @returns Transformed async iterable
*/
function pipe<T>(
source: AsyncIterable<T>,
...operators: Array<(source: AsyncIterable<any>) => AsyncIterable<any>>
): AsyncIterable<any>;Advanced async iterator implementation for complex subscription scenarios.
/**
* Repeater class for creating custom async iterators
* Provides full control over async iteration with push/stop capabilities
*/
class Repeater<T, TReturn = any, TNext = unknown> implements AsyncIterableIterator<T> {
constructor(
executor: (push: (value: T) => void, stop: (value?: TReturn) => void) => void | Promise<void>
);
next(): Promise<IteratorResult<T, TReturn>>;
return(value?: TReturn): Promise<IteratorResult<T, TReturn>>;
throw(error?: any): Promise<IteratorResult<T, TReturn>>;
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}Usage Example:
import { Repeater } from 'graphql-yoga';
const customSubscription = new Repeater(async (push, stop) => {
const interval = setInterval(() => {
push({ timestamp: Date.now(), data: Math.random() });
}, 1000);
// Cleanup when subscription ends
return () => clearInterval(interval);
});
const resolvers = {
Subscription: {
liveData: {
subscribe: () => customSubscription,
resolve: (payload) => payload,
},
},
};type AsyncIterableIterator<T> = AsyncIterable<T> & AsyncIterator<T>;
interface IteratorResult<T, TReturn = any> {
done: boolean;
value: T | TReturn;
}