CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-web-streams-polyfill

Web Streams polyfill and ponyfill implementation based on the WHATWG specification

Pending
Overview
Eval results
Files

queuing-strategies.mddocs/

Queuing Strategies

Backpressure management through queuing strategies that control how data is buffered within streams and determine when backpressure should be applied.

Capabilities

CountQueuingStrategy Class

A queuing strategy that counts the number of chunks in the queue, regardless of their individual sizes.

/**
 * A queuing strategy that counts the number of chunks
 */
class CountQueuingStrategy implements QueuingStrategy<any> {
  constructor(options: { highWaterMark: number });
  
  /** Returns the high water mark provided to the constructor */
  readonly highWaterMark: number;
  
  /** Measures the size of chunk by always returning 1 */
  readonly size: (chunk: any) => 1;
}

Usage Examples:

import { ReadableStream, CountQueuingStrategy } from "web-streams-polyfill";

// Readable stream that buffers up to 5 chunks
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("chunk 1");
    controller.enqueue("chunk 2");
    controller.enqueue("chunk 3");
    // Stream will signal backpressure after 5 chunks
  }
}, new CountQueuingStrategy({ highWaterMark: 5 }));

// Writable stream with count-based backpressure
const writable = new WritableStream({
  write(chunk) {
    console.log("Processing:", chunk);
    return new Promise(resolve => setTimeout(resolve, 100));
  }
}, new CountQueuingStrategy({ highWaterMark: 3 }));

// Transform stream with different strategies for each side
const transform = new TransformStream({
  transform(chunk, controller) {
    // Split each input chunk into multiple output chunks
    const parts = chunk.toString().split(' ');
    parts.forEach(part => controller.enqueue(part));
  }
}, 
new CountQueuingStrategy({ highWaterMark: 2 }), // writable side
new CountQueuingStrategy({ highWaterMark: 10 }) // readable side
);

ByteLengthQueuingStrategy Class

A queuing strategy that measures the size of chunks by their byte length, useful for streams dealing with binary data or text where the size of individual chunks matters.

/**
 * A queuing strategy that counts the number of bytes in each chunk
 */
class ByteLengthQueuingStrategy implements QueuingStrategy<ArrayBufferView> {
  constructor(options: { highWaterMark: number });
  
  /** Returns the high water mark provided to the constructor */
  readonly highWaterMark: number;
  
  /** Measures the size of chunk by returning the value of its byteLength property */
  readonly size: (chunk: ArrayBufferView) => number;
}

Usage Examples:

import { ReadableStream, ByteLengthQueuingStrategy } from "web-streams-polyfill";

// Readable stream that buffers up to 1MB of data
const byteStream = new ReadableStream({
  start(controller) {
    const chunk1 = new Uint8Array(512 * 1024); // 512KB
    const chunk2 = new Uint8Array(256 * 1024); // 256KB
    
    controller.enqueue(chunk1);
    controller.enqueue(chunk2);
    // Stream will signal backpressure after 1MB total
  }
}, new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }));

// File upload stream with byte-based backpressure
const uploadStream = new WritableStream({
  async write(chunk) {
    // Upload chunk to server
    await fetch('/upload', {
      method: 'POST',
      body: chunk
    });
  }
}, new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 })); // 64KB buffer

// Transform stream that processes binary data
const compressionTransform = new TransformStream({
  transform(chunk, controller) {
    // Simulate compression (input: Uint8Array, output: compressed Uint8Array)
    const compressed = new Uint8Array(chunk.byteLength / 2); // 50% compression
    compressed.set(new Uint8Array(chunk.buffer.slice(0, compressed.length)));
    controller.enqueue(compressed);
  }
},
new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }),     // 1MB input buffer
new ByteLengthQueuingStrategy({ highWaterMark: 512 * 1024 })       // 512KB output buffer
);

Custom Queuing Strategies

You can create custom queuing strategies by implementing the QueuingStrategy interface.

interface QueuingStrategy<T = any> {
  /** The high water mark value */
  highWaterMark?: number;
  
  /** Function that computes and returns the finite non-negative size of the given chunk value */
  size?: (chunk: T) => number;
}

Usage Examples:

import { ReadableStream, WritableStream } from "web-streams-polyfill";

// Custom strategy that measures string length
const stringLengthStrategy = {
  highWaterMark: 1000, // Buffer up to 1000 characters
  size(chunk: string) {
    return chunk.length;
  }
};

const textStream = new ReadableStream({
  start(controller) {
    controller.enqueue("Hello");      // 5 characters
    controller.enqueue("World");      // 5 characters  
    controller.enqueue("!");          // 1 character
    // Total: 11 characters (well under 1000 limit)
  }
}, stringLengthStrategy);

// Custom strategy for objects based on property count
const objectComplexityStrategy = {
  highWaterMark: 100, // Buffer objects with up to 100 total properties
  size(chunk: object) {
    return Object.keys(chunk).length;
  }
};

const objectStream = new WritableStream({
  write(chunk) {
    console.log(`Processing object with ${Object.keys(chunk).length} properties`);
  }
}, objectComplexityStrategy);

// Custom strategy with priority-based sizing
interface PriorityItem {
  data: any;
  priority: 'low' | 'medium' | 'high';
}

const priorityStrategy = {
  highWaterMark: 50,
  size(chunk: PriorityItem) {
    switch (chunk.priority) {
      case 'high': return 10;   // High priority items take more buffer space
      case 'medium': return 5;
      case 'low': return 1;
      default: return 1;
    }
  }
};

const priorityStream = new ReadableStream({
  start(controller) {
    controller.enqueue({ data: "urgent", priority: 'high' as const });
    controller.enqueue({ data: "normal", priority: 'medium' as const });
    controller.enqueue({ data: "background", priority: 'low' as const });
  }
}, priorityStrategy);

Queuing Strategy Types

interface QueuingStrategyInit {
  /** The high water mark value, which must be a non-negative number */
  highWaterMark: number;
}

type QueuingStrategySizeCallback<T = any> = (chunk: T) => number;

Backpressure Management

Queuing strategies work together with stream controllers to manage backpressure:

import { WritableStream, CountQueuingStrategy } from "web-streams-polyfill";

const slowProcessor = new WritableStream({
  async write(chunk, controller) {
    // Simulate slow processing
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log("Processed:", chunk);
  }
}, new CountQueuingStrategy({ highWaterMark: 2 }));

const writer = slowProcessor.getWriter();

async function writeWithBackpressure() {
  try {
    await writer.write("chunk 1");
    console.log("Desired size:", writer.desiredSize); // 1 (2 - 1)
    
    await writer.write("chunk 2");  
    console.log("Desired size:", writer.desiredSize); // 0 (2 - 2)
    
    await writer.write("chunk 3");
    console.log("Desired size:", writer.desiredSize); // -1 (2 - 3, backpressure!)
    
    // The write above will wait until there's space in the queue
    // before resolving, providing automatic backpressure handling
    
  } finally {
    writer.releaseLock();
  }
}

writeWithBackpressure();

Default Strategies

When no strategy is provided, streams use default strategies:

// These are equivalent:
const stream1 = new ReadableStream(source);
const stream2 = new ReadableStream(source, new CountQueuingStrategy({ highWaterMark: 1 }));

// For writable streams:
const writable1 = new WritableStream(sink);
const writable2 = new WritableStream(sink, new CountQueuingStrategy({ highWaterMark: 1 }));

// For byte streams, the default high water mark is 0:
const byteStream1 = new ReadableStream({ type: 'bytes' });
const byteStream2 = new ReadableStream({ type: 'bytes' }, { highWaterMark: 0 });

Install with Tessl CLI

npx tessl i tessl/npm-web-streams-polyfill

docs

index.md

queuing-strategies.md

readable-streams.md

transform-streams.md

writable-streams.md

tile.json