or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

index.mddocs/

EventSource Parser

EventSource Parser is a streaming parser for server-sent events/EventSource that operates without assumptions about how the actual stream of data is retrieved. It provides both imperative parser instances and a modern TransformStream variant for building EventSource clients and polyfills across JavaScript environments.

Package Information

  • Package Name: eventsource-parser
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install eventsource-parser

Overview

EventSource Parser provides streaming, source-agnostic parsing of server-sent events (SSE) for building EventSource clients and polyfills. Key features include:

  • Stream-agnostic design: Works with any data source (fetch, WebSocket, file system, etc.)
  • Dual API approaches: Imperative callback-based parser and modern TransformStream
  • Full EventSource specification compliance: Handles all SSE fields, error conditions, and edge cases
  • TypeScript-first: Complete type definitions with generic type preservation
  • Zero dependencies: Lightweight implementation with no external dependencies
  • Universal compatibility: Works in browsers, Node.js, Deno, Bun, and other JavaScript environments

Core Imports

import { createParser, type EventSourceMessage, type EventSourceParser } from "eventsource-parser";

For CommonJS:

const { createParser } = require("eventsource-parser");

For stream functionality:

import { EventSourceParserStream } from "eventsource-parser/stream";

Basic Usage

import { createParser, type EventSourceMessage } from "eventsource-parser";

function onEvent(event: EventSourceMessage) {
  console.log('Received event!');
  console.log('id: %s', event.id || '<none>');
  console.log('event: %s', event.event || '<none>');
  console.log('data: %s', event.data);
}

const parser = createParser({ onEvent });
const sseStream = getSomeReadableStream();

for await (const chunk of sseStream) {
  parser.feed(chunk);
}

// Reset for new stream
parser.reset();

Advanced Parser Usage

Retry Intervals

Handle server-sent retry intervals for reconnection logic:

const parser = createParser({
  onRetry(retryInterval) {
    console.log('Server requested retry interval of %dms', retryInterval);
    // Implement reconnection logic with custom delay
  },
  onEvent(event) {
    // Handle parsed events
  },
});

Parse Error Handling

Handle parsing errors with detailed context:

import { createParser, type ParseError } from "eventsource-parser";

const parser = createParser({
  onError(error: ParseError) {
    console.error('Error parsing event:', error);
    if (error.type === 'unknown-field') {
      console.error('Field name:', error.field);
      console.error('Field value:', error.value);
      console.error('Line:', error.line);
    } else if (error.type === 'invalid-retry') {
      console.error('Invalid retry interval:', error.value);
    }
  },
  onEvent(event) {
    // Handle valid events
  },
});

Comment Handling

Process comments from the EventSource stream:

const parser = createParser({
  onComment(comment) {
    console.log('Received comment:', comment);
    // Note: Leading whitespace is preserved
  },
  onEvent(event) {
    // Handle events
  },
});

Parser Reset with Flush

Reset parser state and optionally consume pending data:

// Basic reset - discards pending data
parser.reset();

// Reset with flush - processes pending data as final event
parser.reset({ consume: true });

Architecture

EventSource Parser is built around several key components:

  • Core Parser: Imperative parser that processes chunks via callbacks (createParser)
  • Stream Parser: Modern TransformStream implementation for streaming pipelines
  • Specification Compliance: Full EventSource specification support including line parsing, field processing, and error handling
  • Callback System: Event-driven architecture with separate callbacks for events, errors, retries, and comments
  • State Management: Parser state tracking with reset capabilities for connection management

Capabilities

Core Parser

Creates imperative parser instances with callback-based event handling. Perfect for custom streaming implementations and environments without TransformStream support.

function createParser(callbacks: ParserCallbacks): EventSourceParser;

interface ParserCallbacks {
  onEvent?: ((event: EventSourceMessage) => void) | undefined;
  onRetry?: ((retry: number) => void) | undefined;
  onComment?: ((comment: string) => void) | undefined;
  onError?: ((error: ParseError) => void) | undefined;
}

interface EventSourceParser {
  feed(chunk: string): void;
  reset(options?: { consume?: boolean }): void;
}

Stream Parser

Modern TransformStream implementation for environments that support streaming. Ideal for modern browsers, Node.js 18+, and streaming architectures.

class EventSourceParserStream extends TransformStream<string, EventSourceMessage> {
  constructor(options?: StreamOptions);
}

interface StreamOptions {
  onError?: ('terminate' | ((error: Error) => void)) | undefined;
  onRetry?: ((retry: number) => void) | undefined;
  onComment?: ((comment: string) => void) | undefined;
}

Stream Usage Examples

import { EventSourceParserStream } from "eventsource-parser/stream";
import { type EventSourceMessage } from "eventsource-parser";

// Basic stream usage - typical fetch + transform pattern
const response = await fetch('/events');
const eventStream = response.body
  .pipeThrough(new TextDecoderStream())
  .pipeThrough(new EventSourceParserStream());

// Process parsed events
for await (const event of eventStream) {
  console.log('Event:', event.event || 'message');
  console.log('Data:', event.data);
}

// Stream with WritableStream consumer
const stream = new EventSourceParserStream();
const sseStream = getSomeReadableStream();

sseStream
  .pipeThrough(stream)
  .pipeTo(new WritableStream({
    write(message: EventSourceMessage) {
      console.log('Event:', message.event || 'message');
      console.log('Data:', message.data);
    }
  }));

// Stream with callback options
const streamWithHandlers = new EventSourceParserStream({
  onRetry: (retry) => {
    console.log(`Server requested retry in ${retry}ms`);
    // Implement reconnection delay logic
  },
  onComment: (comment) => {
    console.log(`Server comment: ${comment}`);
    // Handle keep-alive comments or server messages
  },
  onError: 'terminate' // Terminates stream on error
  // Alternative: onError: (error) => console.error('Stream error:', error)
});

// Use in streaming pipeline
fetch('/events')
  .then(response => response.body)
  .pipeThrough(new TextDecoderStream())
  .pipeThrough(streamWithHandlers)
  .pipeTo(new WritableStream({
    write(event) { /* process event */ }
  }));

Types

EventSource Message

Structure representing a parsed EventSource message.

interface EventSourceMessage {
  /** Event type sent from server, undefined if not explicitly declared */
  event?: string | undefined;
  /** Message ID for reconnection tracking, undefined if not provided */
  id?: string | undefined;
  /** Message data content */
  data: string;
}

Parse Error

Error class for parsing failures with detailed context information.

class ParseError extends Error {
  /** Type of error that occurred */
  type: ErrorType;
  /** Field name causing error (for unknown-field errors) */
  field?: string | undefined;
  /** Field value causing error */
  value?: string | undefined;
  /** Full line causing error */
  line?: string | undefined;
}

type ErrorType = "invalid-retry" | "unknown-field";

Usage Examples

Building an EventSource Client

import { createParser, type EventSourceMessage, type ParseError } from "eventsource-parser";

class SimpleEventSourceClient {
  private parser: ReturnType<typeof createParser>;
  private controller: AbortController | null = null;

  constructor(
    private url: string,
    private options: {
      onMessage?: (event: EventSourceMessage) => void;
      onError?: (error: Error) => void;
      onRetry?: (retryMs: number) => void;
    } = {}
  ) {
    this.parser = createParser({
      onEvent: this.options.onMessage || (() => {}),
      onError: (error: ParseError) => {
        this.options.onError?.(error);
      },
      onRetry: this.options.onRetry || (() => {}),
    });
  }

  async connect() {
    this.controller = new AbortController();
    
    try {
      const response = await fetch(this.url, {
        signal: this.controller.signal,
        headers: { 'Accept': 'text/event-stream' }
      });
      
      if (!response.body) throw new Error('No response body');
      
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value, { stream: true });
        this.parser.feed(chunk);
      }
    } catch (error) {
      if (error.name !== 'AbortError') {
        this.options.onError?.(error as Error);
      }
    }
  }

  disconnect() {
    this.controller?.abort();
    this.parser.reset();
  }
}

// Usage
const client = new SimpleEventSourceClient('/events', {
  onMessage: (event) => {
    console.log(`Received ${event.event || 'message'}: ${event.data}`);
  },
  onRetry: (ms) => {
    console.log(`Reconnecting in ${ms}ms`);
    setTimeout(() => client.connect(), ms);
  }
});

client.connect();

Stream-based EventSource Processing

import { EventSourceParserStream } from "eventsource-parser/stream";

// Process server-sent events in a streaming pipeline
async function processEventStream(url: string) {
  const response = await fetch(url, {
    headers: { 'Accept': 'text/event-stream' }
  });

  if (!response.body) throw new Error('No response body');

  const eventStream = response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(new EventSourceParserStream({
      onRetry: (ms) => console.log(`Server retry: ${ms}ms`),
      onComment: (comment) => console.log(`Server comment: ${comment}`),
      onError: (error) => console.error('Parse error:', error)
    }));

  // Process events
  for await (const event of eventStream) {
    switch (event.event) {
      case 'update':
        handleUpdate(JSON.parse(event.data));
        break;
      case 'notification':
        handleNotification(event.data);
        break;
      default:
        console.log('Generic message:', event.data);
    }
  }
}

function handleUpdate(data: any) {
  console.log('Update received:', data);
}

function handleNotification(message: string) {
  console.log('Notification:', message);
}

processEventStream('/api/events');

Custom Data Source Integration

import { createParser, type EventSourceMessage } from "eventsource-parser";

// Integrate with custom transport (WebSocket, custom protocol, etc.)
class WebSocketEventSource {
  private parser: ReturnType<typeof createParser>;
  private ws: WebSocket | null = null;

  constructor(
    private url: string,
    private onEvent: (event: EventSourceMessage) => void
  ) {
    this.parser = createParser({
      onEvent: this.onEvent,
      onError: (error) => console.error('Parse error:', error),
      onRetry: (ms) => console.log(`Server requested retry: ${ms}ms`)
    });
  }

  connect() {
    this.ws = new WebSocket(this.url);
    
    this.ws.onmessage = (wsEvent) => {
      // Assume WebSocket sends SSE-formatted text
      this.parser.feed(wsEvent.data);
    };

    this.ws.onclose = () => {
      // Process any remaining data when connection closes
      this.parser.reset({ consume: true });
    };
  }

  disconnect() {
    this.ws?.close();
    this.parser.reset();
  }
}

// Usage
const wsEventSource = new WebSocketEventSource(
  'ws://localhost:8080/events',
  (event) => {
    console.log('Event via WebSocket:', event);
  }
);

wsEventSource.connect();