Streaming, source-agnostic EventSource/Server-Sent Events parser for building clients and polyfills
npx @tessl/cli install tessl/npm-eventsource-parser@3.0.0EventSource Parser is a streaming parser for server-sent events/EventSource that operates without assumptions about how the actual stream of data is retrieved. It provides both imperative parser instances and a modern TransformStream variant for building EventSource clients and polyfills across JavaScript environments.
npm install eventsource-parserEventSource Parser provides streaming, source-agnostic parsing of server-sent events (SSE) for building EventSource clients and polyfills. Key features include:
import { createParser, type EventSourceMessage, type EventSourceParser } from "eventsource-parser";For CommonJS:
const { createParser } = require("eventsource-parser");For stream functionality:
import { EventSourceParserStream } from "eventsource-parser/stream";import { createParser, type EventSourceMessage } from "eventsource-parser";
function onEvent(event: EventSourceMessage) {
console.log('Received event!');
console.log('id: %s', event.id || '<none>');
console.log('event: %s', event.event || '<none>');
console.log('data: %s', event.data);
}
const parser = createParser({ onEvent });
const sseStream = getSomeReadableStream();
for await (const chunk of sseStream) {
parser.feed(chunk);
}
// Reset for new stream
parser.reset();Handle server-sent retry intervals for reconnection logic:
const parser = createParser({
onRetry(retryInterval) {
console.log('Server requested retry interval of %dms', retryInterval);
// Implement reconnection logic with custom delay
},
onEvent(event) {
// Handle parsed events
},
});Handle parsing errors with detailed context:
import { createParser, type ParseError } from "eventsource-parser";
const parser = createParser({
onError(error: ParseError) {
console.error('Error parsing event:', error);
if (error.type === 'unknown-field') {
console.error('Field name:', error.field);
console.error('Field value:', error.value);
console.error('Line:', error.line);
} else if (error.type === 'invalid-retry') {
console.error('Invalid retry interval:', error.value);
}
},
onEvent(event) {
// Handle valid events
},
});Process comments from the EventSource stream:
const parser = createParser({
onComment(comment) {
console.log('Received comment:', comment);
// Note: Leading whitespace is preserved
},
onEvent(event) {
// Handle events
},
});Reset parser state and optionally consume pending data:
// Basic reset - discards pending data
parser.reset();
// Reset with flush - processes pending data as final event
parser.reset({ consume: true });EventSource Parser is built around several key components:
createParser)Creates imperative parser instances with callback-based event handling. Perfect for custom streaming implementations and environments without TransformStream support.
function createParser(callbacks: ParserCallbacks): EventSourceParser;
interface ParserCallbacks {
onEvent?: ((event: EventSourceMessage) => void) | undefined;
onRetry?: ((retry: number) => void) | undefined;
onComment?: ((comment: string) => void) | undefined;
onError?: ((error: ParseError) => void) | undefined;
}
interface EventSourceParser {
feed(chunk: string): void;
reset(options?: { consume?: boolean }): void;
}Modern TransformStream implementation for environments that support streaming. Ideal for modern browsers, Node.js 18+, and streaming architectures.
class EventSourceParserStream extends TransformStream<string, EventSourceMessage> {
constructor(options?: StreamOptions);
}
interface StreamOptions {
onError?: ('terminate' | ((error: Error) => void)) | undefined;
onRetry?: ((retry: number) => void) | undefined;
onComment?: ((comment: string) => void) | undefined;
}import { EventSourceParserStream } from "eventsource-parser/stream";
import { type EventSourceMessage } from "eventsource-parser";
// Basic stream usage - typical fetch + transform pattern
const response = await fetch('/events');
const eventStream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());
// Process parsed events
for await (const event of eventStream) {
console.log('Event:', event.event || 'message');
console.log('Data:', event.data);
}
// Stream with WritableStream consumer
const stream = new EventSourceParserStream();
const sseStream = getSomeReadableStream();
sseStream
.pipeThrough(stream)
.pipeTo(new WritableStream({
write(message: EventSourceMessage) {
console.log('Event:', message.event || 'message');
console.log('Data:', message.data);
}
}));
// Stream with callback options
const streamWithHandlers = new EventSourceParserStream({
onRetry: (retry) => {
console.log(`Server requested retry in ${retry}ms`);
// Implement reconnection delay logic
},
onComment: (comment) => {
console.log(`Server comment: ${comment}`);
// Handle keep-alive comments or server messages
},
onError: 'terminate' // Terminates stream on error
// Alternative: onError: (error) => console.error('Stream error:', error)
});
// Use in streaming pipeline
fetch('/events')
.then(response => response.body)
.pipeThrough(new TextDecoderStream())
.pipeThrough(streamWithHandlers)
.pipeTo(new WritableStream({
write(event) { /* process event */ }
}));Structure representing a parsed EventSource message.
interface EventSourceMessage {
/** Event type sent from server, undefined if not explicitly declared */
event?: string | undefined;
/** Message ID for reconnection tracking, undefined if not provided */
id?: string | undefined;
/** Message data content */
data: string;
}Error class for parsing failures with detailed context information.
class ParseError extends Error {
/** Type of error that occurred */
type: ErrorType;
/** Field name causing error (for unknown-field errors) */
field?: string | undefined;
/** Field value causing error */
value?: string | undefined;
/** Full line causing error */
line?: string | undefined;
}
type ErrorType = "invalid-retry" | "unknown-field";import { createParser, type EventSourceMessage, type ParseError } from "eventsource-parser";
class SimpleEventSourceClient {
private parser: ReturnType<typeof createParser>;
private controller: AbortController | null = null;
constructor(
private url: string,
private options: {
onMessage?: (event: EventSourceMessage) => void;
onError?: (error: Error) => void;
onRetry?: (retryMs: number) => void;
} = {}
) {
this.parser = createParser({
onEvent: this.options.onMessage || (() => {}),
onError: (error: ParseError) => {
this.options.onError?.(error);
},
onRetry: this.options.onRetry || (() => {}),
});
}
async connect() {
this.controller = new AbortController();
try {
const response = await fetch(this.url, {
signal: this.controller.signal,
headers: { 'Accept': 'text/event-stream' }
});
if (!response.body) throw new Error('No response body');
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
this.parser.feed(chunk);
}
} catch (error) {
if (error.name !== 'AbortError') {
this.options.onError?.(error as Error);
}
}
}
disconnect() {
this.controller?.abort();
this.parser.reset();
}
}
// Usage
const client = new SimpleEventSourceClient('/events', {
onMessage: (event) => {
console.log(`Received ${event.event || 'message'}: ${event.data}`);
},
onRetry: (ms) => {
console.log(`Reconnecting in ${ms}ms`);
setTimeout(() => client.connect(), ms);
}
});
client.connect();import { EventSourceParserStream } from "eventsource-parser/stream";
// Process server-sent events in a streaming pipeline
async function processEventStream(url: string) {
const response = await fetch(url, {
headers: { 'Accept': 'text/event-stream' }
});
if (!response.body) throw new Error('No response body');
const eventStream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream({
onRetry: (ms) => console.log(`Server retry: ${ms}ms`),
onComment: (comment) => console.log(`Server comment: ${comment}`),
onError: (error) => console.error('Parse error:', error)
}));
// Process events
for await (const event of eventStream) {
switch (event.event) {
case 'update':
handleUpdate(JSON.parse(event.data));
break;
case 'notification':
handleNotification(event.data);
break;
default:
console.log('Generic message:', event.data);
}
}
}
function handleUpdate(data: any) {
console.log('Update received:', data);
}
function handleNotification(message: string) {
console.log('Notification:', message);
}
processEventStream('/api/events');import { createParser, type EventSourceMessage } from "eventsource-parser";
// Integrate with custom transport (WebSocket, custom protocol, etc.)
class WebSocketEventSource {
private parser: ReturnType<typeof createParser>;
private ws: WebSocket | null = null;
constructor(
private url: string,
private onEvent: (event: EventSourceMessage) => void
) {
this.parser = createParser({
onEvent: this.onEvent,
onError: (error) => console.error('Parse error:', error),
onRetry: (ms) => console.log(`Server requested retry: ${ms}ms`)
});
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onmessage = (wsEvent) => {
// Assume WebSocket sends SSE-formatted text
this.parser.feed(wsEvent.data);
};
this.ws.onclose = () => {
// Process any remaining data when connection closes
this.parser.reset({ consume: true });
};
}
disconnect() {
this.ws?.close();
this.parser.reset();
}
}
// Usage
const wsEventSource = new WebSocketEventSource(
'ws://localhost:8080/events',
(event) => {
console.log('Event via WebSocket:', event);
}
);
wsEventSource.connect();