Fully-featured GraphQL Server with focus on easy setup, performance & great developer experience
Server-Sent Events (SSE) support for GraphQL subscriptions with streaming capabilities and real-time data delivery. Enables long-running GraphQL operations for real-time features.
Function to create a Server-Sent Events result processor for streaming GraphQL subscriptions.
/**
* Creates a result processor for Server-Sent Events streaming
* @returns Result processor configured for SSE streaming
*/
function getSSEProcessor(): ResultProcessor;
/**
* Result processor function type
*/
type ResultProcessor = (
result: ResultProcessorInput,
fetchAPI: FetchAPI,
acceptedMediaType: string
) => PromiseOrValue<Response>;Usage Examples:
import { createYoga, getSSEProcessor } from "graphql-yoga";
// Enable SSE subscriptions
const yoga = createYoga({
schema: mySchema,
plugins: [
{
onResultProcess({ request, setResultProcessor }) {
// Use SSE for subscription operations
const acceptsSSE = request.headers.get('accept')?.includes('text/event-stream');
if (acceptsSSE) {
setResultProcessor(getSSEProcessor(), 'text/event-stream');
}
}
}
]
});
// GraphQL schema with subscriptions
const schema = createSchema({
typeDefs: `
type Query {
hello: String
}
type Subscription {
messageAdded: Message
userStatusChanged(userId: ID!): UserStatus
notifications: Notification
}
type Message {
id: ID!
content: String!
timestamp: String!
}
type UserStatus {
userId: ID!
online: Boolean!
lastSeen: String
}
type Notification {
id: ID!
type: String!
message: String!
}
`,
resolvers: {
Query: {
hello: () => "Hello World!"
},
Subscription: {
messageAdded: {
subscribe: () => messageEventStream()
},
userStatusChanged: {
subscribe: (_, { userId }) => userStatusStream(userId)
},
notifications: {
subscribe: () => notificationStream()
}
}
}
});Types for handling various forms of GraphQL execution results in processors.
/**
* Input type for result processors supporting single results and streams
*/
type ResultProcessorInput =
| MaybeArray<ExecutionResultWithSerializer>
| AsyncIterable<ExecutionResultWithSerializer<any, { http?: GraphQLHTTPExtensions }>>;
/**
* GraphQL execution result with optional serialization
*/
type ExecutionResultWithSerializer<TData = any, TExtensions = any> = ExecutionResult<
TData,
TExtensions
> & {
/** Custom stringify function for result serialization */
stringify?: (result: ExecutionResult<TData, TExtensions>) => string;
};
/**
* Utility type for single values or arrays
*/
type MaybeArray<T> = T | T[];Usage Examples:
// Custom result processor handling different input types
const customSSEProcessor: ResultProcessor = async (result, fetchAPI, acceptedMediaType) => {
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
};
// Handle single result
if (!isAsyncIterable(result)) {
const singleResult = Array.isArray(result) ? result : [result];
const body = singleResult.map(r => `data: ${JSON.stringify(r)}\n\n`).join('');
return new fetchAPI.Response(body, { headers });
}
// Handle streaming result
const textEncoder = new fetchAPI.TextEncoder();
const stream = new fetchAPI.ReadableStream({
async start(controller) {
try {
for await (const item of result) {
const data = Array.isArray(item) ? item : [item];
for (const resultItem of data) {
const serialized = resultItem.stringify
? resultItem.stringify(resultItem)
: JSON.stringify(resultItem);
controller.enqueue(textEncoder.encode(`data: ${serialized}\n\n`));
}
}
} catch (error) {
controller.enqueue(textEncoder.encode(`data: ${JSON.stringify({ errors: [{ message: error.message }] })}\n\n`));
} finally {
controller.close();
}
}
});
return new fetchAPI.Response(stream, { headers });
};All subscription functionality is re-exported from the @graphql-yoga/subscription package.
// All exports from @graphql-yoga/subscription are available
export * from '@graphql-yoga/subscription';Usage Examples:
import {
createYoga,
// Subscription utilities (if available from @graphql-yoga/subscription)
// These would be specific to the subscription package
} from "graphql-yoga";
// Example subscription resolvers using async iterators
const resolvers = {
Subscription: {
messageAdded: {
subscribe: async function* () {
// Async generator for real-time messages
while (true) {
const message = await waitForNewMessage();
yield { messageAdded: message };
}
}
},
timer: {
subscribe: async function* () {
// Timer subscription example
let count = 0;
while (count < 10) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield { timer: { count: ++count, timestamp: new Date().toISOString() } };
}
}
}
}
};Integration patterns for subscriptions with various GraphQL clients and protocols.
Usage Examples:
// Client-side subscription with fetch API (SSE)
async function subscribeToMessages() {
const response = await fetch('/graphql', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify({
query: `
subscription {
messageAdded {
id
content
timestamp
}
}
`
})
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
console.log('New message:', data.data.messageAdded);
}
}
}
}
// GraphiQL configuration for subscriptions
const yoga = createYoga({
schema: subscriptionSchema,
graphiql: {
subscriptionsProtocol: 'SSE', // Enable SSE in GraphiQL
title: 'Real-time API Explorer'
}
});
// Advanced subscription with filtering
const resolvers = {
Subscription: {
userNotifications: {
subscribe: async function* (_, { userId }, context) {
// Validate user access
if (!context.user || context.user.id !== userId) {
throw new Error('Unauthorized');
}
// Subscribe to user-specific notifications
const notifications = subscribeToUserNotifications(userId);
for await (const notification of notifications) {
// Filter notifications based on user preferences
if (shouldSendNotification(notification, context.user.preferences)) {
yield { userNotifications: notification };
}
}
}
}
}
};Advanced configuration options for Server-Sent Events functionality.
Usage Examples:
// Custom SSE plugin with configuration
const customSSEPlugin: Plugin = {
onResultProcess({ request, result, setResultProcessor }) {
const acceptsSSE = request.headers.get('accept')?.includes('text/event-stream');
if (acceptsSSE) {
setResultProcessor(async (result, fetchAPI) => {
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
};
// Custom ping interval for keep-alive
const pingInterval = 30000; // 30 seconds
if (isAsyncIterable(result)) {
const textEncoder = new fetchAPI.TextEncoder();
const stream = new fetchAPI.ReadableStream({
start(controller) {
// Send initial connection event
controller.enqueue(textEncoder.encode('event: connected\ndata: {}\n\n'));
// Set up ping to keep connection alive
const pingTimer = setInterval(() => {
controller.enqueue(textEncoder.encode('event: ping\ndata: {}\n\n'));
}, pingInterval);
// Handle the subscription stream
(async () => {
try {
for await (const item of result) {
controller.enqueue(textEncoder.encode(`data: ${JSON.stringify(item)}\n\n`));
}
} catch (error) {
controller.enqueue(textEncoder.encode(`event: error\ndata: ${JSON.stringify({ message: error.message })}\n\n`));
} finally {
clearInterval(pingTimer);
controller.enqueue(textEncoder.encode('event: complete\ndata: {}\n\n'));
controller.close();
}
})();
}
});
return new fetchAPI.Response(stream, { headers });
}
// Handle single result
const body = `data: ${JSON.stringify(result)}\n\n`;
return new fetchAPI.Response(body, { headers });
}, 'text/event-stream');
}
}
};
const yoga = createYoga({
schema: subscriptionSchema,
plugins: [customSSEPlugin]
});tessl i tessl/npm-graphql-yoga@4.0.0