or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

callback-parsing.mddom-parsing.mdfeed-parsing.mdindex.mdstream-processing.mdtokenization.md
tile.json

stream-processing.mddocs/

Stream Processing

WritableStream integration for Node.js streams, enabling pipeline processing and integration with other stream-based tools. The WritableStream class wraps the Parser with a standard Node.js Writable interface.

Capabilities

WritableStream Class

A Node.js Writable stream that processes HTML/XML data using the htmlparser2 Parser internally.

/**
 * WritableStream makes the Parser interface available as a NodeJS stream
 */
class WritableStream extends Writable {
  /**
   * Create a new WritableStream instance
   * @param cbs - Callback object implementing Handler interface (partial)
   * @param options - Parser configuration options
   */
  constructor(cbs: Partial<Handler>, options?: ParserOptions);
}

Usage Examples:

import { WritableStream } from "htmlparser2/WritableStream";
import fs from "fs";

// Basic stream processing
const parserStream = new WritableStream({
  onopentag(name, attribs) {
    if (name === "link" && attribs.rel === "stylesheet") {
      console.log("Found CSS:", attribs.href);
    }
  },
  ontext(text) {
    if (text.trim()) {
      console.log("Text:", text.trim());
    }
  }
});

// Pipe from file stream
const htmlFile = fs.createReadStream("./document.html");
htmlFile.pipe(parserStream).on("finish", () => {
  console.log("Parsing complete");
});

// Pipe from HTTP response
import https from "https";

https.get("https://example.com", (response) => {
  response.pipe(parserStream);
});

Stream Pipeline Integration

import { WritableStream } from "htmlparser2/WritableStream";
import { Transform } from "stream";
import fs from "fs";

// Custom transform stream to filter HTML content
class HtmlFilter extends Transform {
  constructor() {
    super({ objectMode: true });
  }
  
  _transform(chunk, encoding, callback) {
    // Filter out script tags and pass through
    const filtered = chunk.toString().replace(/<script[^>]*>.*?<\/script>/gi, '');
    callback(null, filtered);
  }
}

// Chain streams together
const extractLinks = new WritableStream({
  onopentag(name, attribs) {
    if (name === "a" && attribs.href) {
      console.log("Link found:", attribs.href);
    }
  }
});

fs.createReadStream("./page.html")
  .pipe(new HtmlFilter())
  .pipe(extractLinks)
  .on("finish", () => console.log("Link extraction complete"));

Streaming DOM Construction

import { WritableStream } from "htmlparser2/WritableStream";
import { DomHandler } from "domhandler";

// Stream that builds DOM incrementally
const domHandler = new DomHandler((error, dom) => {
  if (error) {
    console.error("DOM construction failed:", error);
    return;
  }
  console.log("DOM constructed:", dom);
});

const domStream = new WritableStream(domHandler, {
  xmlMode: false,
  decodeEntities: true
});

// Process large HTML files without loading entirely into memory
const largeHtmlFile = fs.createReadStream("./large-document.html");
largeHtmlFile.pipe(domStream);

Real-time HTML Processing

import { WritableStream } from "htmlparser2/WritableStream";
import { createServer } from "http";

// HTTP server that processes incoming HTML
const server = createServer((req, res) => {
  if (req.method === "POST" && req.url === "/parse") {
    const results = [];
    
    const htmlProcessor = new WritableStream({
      onopentag(name, attribs) {
        results.push({ type: "tag", name, attribs });
      },
      ontext(text) {
        if (text.trim()) {
          results.push({ type: "text", content: text.trim() });
        }
      },
      onend() {
        res.writeHead(200, { "Content-Type": "application/json" });
        res.end(JSON.stringify(results));
      }
    });
    
    req.pipe(htmlProcessor);
  }
});

Stream Options

The WritableStream accepts the same options as the Parser:

interface ParserOptions {
  /** Enable XML parsing mode */
  xmlMode?: boolean;
  /** Decode HTML entities */
  decodeEntities?: boolean;
  /** Convert tag names to lowercase */
  lowerCaseTags?: boolean;
  /** Convert attribute names to lowercase */
  lowerCaseAttributeNames?: boolean;
  /** Recognize CDATA sections in HTML mode */
  recognizeCDATA?: boolean;
  /** Recognize self-closing tags in HTML mode */
  recognizeSelfClosing?: boolean;
}

Buffer Handling

The WritableStream automatically handles Buffer inputs and string encoding:

import { WritableStream } from "htmlparser2/WritableStream";

const parser = new WritableStream({
  ontext(text) {
    console.log("Text:", text);
  }
});

// Both Buffer and string inputs work
parser.write(Buffer.from("<div>Hello</div>"));
parser.write(" from buffer!");
parser.end();

Error Handling in Streams

import { WritableStream } from "htmlparser2/WritableStream";
import fs from "fs";

const parserStream = new WritableStream({
  onopentag(name, attribs) {
    console.log("Tag:", name);
  },
  onerror(error) {
    console.error("Parse error:", error);
  }
});

// Handle stream errors
parserStream.on("error", (error) => {
  console.error("Stream error:", error);
});

const inputStream = fs.createReadStream("./document.html");

inputStream.on("error", (error) => {
  console.error("File read error:", error);
});

inputStream.pipe(parserStream);

Performance Considerations

  • WritableStream processes data in chunks, making it memory-efficient for large documents
  • Uses internal StringDecoder for proper handling of multi-byte characters across chunk boundaries
  • Suitable for processing documents larger than available memory
  • Minimal overhead compared to direct Parser usage

Integration with Popular Streaming Libraries

With Highland.js

import { WritableStream } from "htmlparser2/WritableStream";
import highland from "highland";
import fs from "fs";

const tags = [];

const htmlStream = new WritableStream({
  onopentag(name, attribs) {
    tags.push({ name, attribs });
  }
});

highland(fs.createReadStream("./document.html"))
  .pipe(htmlStream)
  .on("finish", () => {
    console.log("Found tags:", tags.length);
  });

With RxJS Streams

import { WritableStream } from "htmlparser2/WritableStream";
import { fromEvent } from "rxjs";
import fs from "fs";

const parserStream = new WritableStream({
  onopentag(name, attribs) {
    if (name === "img") {
      console.log("Image:", attribs.src);
    }
  }
});

const finish$ = fromEvent(parserStream, "finish");

finish$.subscribe(() => {
  console.log("Stream processing complete");
});

fs.createReadStream("./gallery.html").pipe(parserStream);