CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/npm-graphql-yoga

Fully-featured GraphQL Server with focus on easy setup, performance & great developer experience

Overview
Eval results
Files

subscription-support.mddocs/

Subscription Support

Server-Sent Events (SSE) support for GraphQL subscriptions with streaming capabilities and real-time data delivery. Enables long-running GraphQL operations for real-time features.

Capabilities

SSE Processor

Function to create a Server-Sent Events result processor for streaming GraphQL subscriptions.

/**
 * Creates a result processor for Server-Sent Events streaming
 * @returns Result processor configured for SSE streaming
 */
function getSSEProcessor(): ResultProcessor;

/**
 * Result processor function type
 */
type ResultProcessor = (
  result: ResultProcessorInput,
  fetchAPI: FetchAPI,
  acceptedMediaType: string
) => PromiseOrValue<Response>;

Usage Examples:

import { createYoga, getSSEProcessor } from "graphql-yoga";

// Enable SSE subscriptions
const yoga = createYoga({
  schema: mySchema,
  plugins: [
    {
      onResultProcess({ request, setResultProcessor }) {
        // Use SSE for subscription operations
        const acceptsSSE = request.headers.get('accept')?.includes('text/event-stream');
        if (acceptsSSE) {
          setResultProcessor(getSSEProcessor(), 'text/event-stream');
        }
      }
    }
  ]
});

// GraphQL schema with subscriptions
const schema = createSchema({
  typeDefs: `
    type Query {
      hello: String
    }
    
    type Subscription {
      messageAdded: Message
      userStatusChanged(userId: ID!): UserStatus
      notifications: Notification
    }
    
    type Message {
      id: ID!
      content: String!
      timestamp: String!
    }
    
    type UserStatus {
      userId: ID!
      online: Boolean!
      lastSeen: String
    }
    
    type Notification {
      id: ID!
      type: String!
      message: String!
    }
  `,
  resolvers: {
    Query: {
      hello: () => "Hello World!"
    },
    Subscription: {
      messageAdded: {
        subscribe: () => messageEventStream()
      },
      userStatusChanged: {
        subscribe: (_, { userId }) => userStatusStream(userId)
      },
      notifications: {
        subscribe: () => notificationStream()
      }
    }
  }
});

Result Processor Input Types

Types for handling various forms of GraphQL execution results in processors.

/**
 * Input type for result processors supporting single results and streams
 */
type ResultProcessorInput =
  | MaybeArray<ExecutionResultWithSerializer>
  | AsyncIterable<ExecutionResultWithSerializer<any, { http?: GraphQLHTTPExtensions }>>;

/**
 * GraphQL execution result with optional serialization
 */
type ExecutionResultWithSerializer<TData = any, TExtensions = any> = ExecutionResult<
  TData,
  TExtensions
> & {
  /** Custom stringify function for result serialization */
  stringify?: (result: ExecutionResult<TData, TExtensions>) => string;
};

/**
 * Utility type for single values or arrays
 */
type MaybeArray<T> = T | T[];

Usage Examples:

// Custom result processor handling different input types
const customSSEProcessor: ResultProcessor = async (result, fetchAPI, acceptedMediaType) => {
  const headers = {
    'Content-Type': 'text/event-stream',
    'Connection': 'keep-alive',
    'Cache-Control': 'no-cache',
  };

  // Handle single result
  if (!isAsyncIterable(result)) {
    const singleResult = Array.isArray(result) ? result : [result];
    const body = singleResult.map(r => `data: ${JSON.stringify(r)}\n\n`).join('');
    return new fetchAPI.Response(body, { headers });
  }

  // Handle streaming result
  const textEncoder = new fetchAPI.TextEncoder();
  const stream = new fetchAPI.ReadableStream({
    async start(controller) {
      try {
        for await (const item of result) {
          const data = Array.isArray(item) ? item : [item];
          for (const resultItem of data) {
            const serialized = resultItem.stringify 
              ? resultItem.stringify(resultItem)
              : JSON.stringify(resultItem);
            controller.enqueue(textEncoder.encode(`data: ${serialized}\n\n`));
          }
        }
      } catch (error) {
        controller.enqueue(textEncoder.encode(`data: ${JSON.stringify({ errors: [{ message: error.message }] })}\n\n`));
      } finally {
        controller.close();
      }
    }
  });

  return new fetchAPI.Response(stream, { headers });
};

Subscription Re-exports

All subscription functionality is re-exported from the @graphql-yoga/subscription package.

// All exports from @graphql-yoga/subscription are available
export * from '@graphql-yoga/subscription';

Usage Examples:

import { 
  createYoga,
  // Subscription utilities (if available from @graphql-yoga/subscription)
  // These would be specific to the subscription package
} from "graphql-yoga";

// Example subscription resolvers using async iterators
const resolvers = {
  Subscription: {
    messageAdded: {
      subscribe: async function* () {
        // Async generator for real-time messages
        while (true) {
          const message = await waitForNewMessage();
          yield { messageAdded: message };
        }
      }
    },
    
    timer: {
      subscribe: async function* () {
        // Timer subscription example
        let count = 0;
        while (count < 10) {
          await new Promise(resolve => setTimeout(resolve, 1000));
          yield { timer: { count: ++count, timestamp: new Date().toISOString() } };
        }
      }
    }
  }
};

GraphQL Subscription Integration

Integration patterns for subscriptions with various GraphQL clients and protocols.

Usage Examples:

// Client-side subscription with fetch API (SSE)
async function subscribeToMessages() {
  const response = await fetch('/graphql', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Accept': 'text/event-stream',
    },
    body: JSON.stringify({
      query: `
        subscription {
          messageAdded {
            id
            content
            timestamp
          }
        }
      `
    })
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split('\n');
    
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));
        console.log('New message:', data.data.messageAdded);
      }
    }
  }
}

// GraphiQL configuration for subscriptions
const yoga = createYoga({
  schema: subscriptionSchema,
  graphiql: {
    subscriptionsProtocol: 'SSE', // Enable SSE in GraphiQL
    title: 'Real-time API Explorer'
  }
});

// Advanced subscription with filtering
const resolvers = {
  Subscription: {
    userNotifications: {
      subscribe: async function* (_, { userId }, context) {
        // Validate user access
        if (!context.user || context.user.id !== userId) {
          throw new Error('Unauthorized');
        }

        // Subscribe to user-specific notifications
        const notifications = subscribeToUserNotifications(userId);
        
        for await (const notification of notifications) {
          // Filter notifications based on user preferences
          if (shouldSendNotification(notification, context.user.preferences)) {
            yield { userNotifications: notification };
          }
        }
      }
    }
  }
};

SSE Configuration and Customization

Advanced configuration options for Server-Sent Events functionality.

Usage Examples:

// Custom SSE plugin with configuration
const customSSEPlugin: Plugin = {
  onResultProcess({ request, result, setResultProcessor }) {
    const acceptsSSE = request.headers.get('accept')?.includes('text/event-stream');
    
    if (acceptsSSE) {
      setResultProcessor(async (result, fetchAPI) => {
        const headers = {
          'Content-Type': 'text/event-stream',
          'Connection': 'keep-alive',
          'Cache-Control': 'no-cache',
          'Access-Control-Allow-Origin': '*',
          'Access-Control-Allow-Headers': 'Content-Type',
        };

        // Custom ping interval for keep-alive
        const pingInterval = 30000; // 30 seconds
        
        if (isAsyncIterable(result)) {
          const textEncoder = new fetchAPI.TextEncoder();
          const stream = new fetchAPI.ReadableStream({
            start(controller) {
              // Send initial connection event
              controller.enqueue(textEncoder.encode('event: connected\ndata: {}\n\n'));
              
              // Set up ping to keep connection alive
              const pingTimer = setInterval(() => {
                controller.enqueue(textEncoder.encode('event: ping\ndata: {}\n\n'));
              }, pingInterval);

              // Handle the subscription stream
              (async () => {
                try {
                  for await (const item of result) {
                    controller.enqueue(textEncoder.encode(`data: ${JSON.stringify(item)}\n\n`));
                  }
                } catch (error) {
                  controller.enqueue(textEncoder.encode(`event: error\ndata: ${JSON.stringify({ message: error.message })}\n\n`));
                } finally {
                  clearInterval(pingTimer);
                  controller.enqueue(textEncoder.encode('event: complete\ndata: {}\n\n'));
                  controller.close();
                }
              })();
            }
          });

          return new fetchAPI.Response(stream, { headers });
        }

        // Handle single result
        const body = `data: ${JSON.stringify(result)}\n\n`;
        return new fetchAPI.Response(body, { headers });
      }, 'text/event-stream');
    }
  }
};

const yoga = createYoga({
  schema: subscriptionSchema,
  plugins: [customSSEPlugin]
});
tessl i tessl/npm-graphql-yoga@4.0.0

docs

built-in-plugins.md

error-handling.md

index.md

plugin-system.md

schema-integration.md

server-configuration.md

subscription-support.md

utility-functions.md

tile.json