CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-web-streams-polyfill

Web Streams polyfill and ponyfill implementation based on the WHATWG specification

Pending
Overview
Eval results
Files

transform-streams.mddocs/

Transform Streams

Transform stream functionality that connects a writable side to a readable side, allowing data to be transformed as it flows through.

Capabilities

TransformStream Class

A transform stream consists of a pair of streams: a writable stream (writable side) and a readable stream (readable side). Writes to the writable side result in new data being made available for reading from the readable side.

/**
 * A transform stream consists of a writable stream and a readable stream connected together
 */
class TransformStream<I = any, O = any> {
  constructor(
    transformer?: Transformer<I, O>,
    writableStrategy?: QueuingStrategy<I>,
    readableStrategy?: QueuingStrategy<O>
  );
  
  /** The readable side of the transform stream */
  readonly readable: ReadableStream<O>;
  
  /** The writable side of the transform stream */
  readonly writable: WritableStream<I>;
}

Usage Examples:

import { TransformStream } from "web-streams-polyfill";

// Create a transform stream that converts text to uppercase
const upperCaseTransform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toString().toUpperCase());
  }
});

// Create a transform stream that filters out empty lines
const filterEmptyLines = new TransformStream({
  transform(chunk, controller) {
    const line = chunk.toString().trim();
    if (line.length > 0) {
      controller.enqueue(line);
    }
  }
});

// Create a transform stream that adds line numbers
let lineNumber = 1;
const addLineNumbers = new TransformStream({
  start(controller) {
    lineNumber = 1;
  },
  transform(chunk, controller) {
    const line = chunk.toString();
    controller.enqueue(`${lineNumber++}: ${line}`);
  }
});

// Chain transforms together
await inputStream
  .pipeThrough(filterEmptyLines)
  .pipeThrough(upperCaseTransform)  
  .pipeThrough(addLineNumbers)
  .pipeTo(outputStream);

TransformStreamDefaultController

Controller provided to transformers for managing the transform stream's readable side.

/**
 * Controller for transform streams that manages the readable side
 */
class TransformStreamDefaultController<O> {
  /** Returns the desired size to fill the controlled transform stream's readable side internal queue */
  readonly desiredSize: number | null;
  
  /** Enqueue a chunk to the controlled transform stream's readable side */
  enqueue(chunk: O): void;
  
  /** Error both sides of the controlled transform stream */
  error(reason?: any): void;
  
  /** Close the controlled transform stream's readable side and error the writable side */
  terminate(): void;
}

Usage Examples:

import { TransformStream } from "web-streams-polyfill";

// Transform stream that splits input into multiple chunks
const splitterTransform = new TransformStream({
  transform(chunk, controller) {
    const text = chunk.toString();
    const words = text.split(' ');
    
    // Enqueue each word as a separate chunk
    for (const word of words) {
      if (word.trim()) {
        controller.enqueue(word.trim());
      }
    }
  }
});

// Transform stream with error handling
const validationTransform = new TransformStream({
  transform(chunk, controller) {
    try {
      const data = JSON.parse(chunk.toString());
      
      if (!data.id) {
        controller.error(new Error("Missing required 'id' field"));
        return;
      }
      
      controller.enqueue(data);
    } catch (error) {
      controller.error(new Error(`Invalid JSON: ${error.message}`));
    }
  }
});

// Transform stream that terminates early
const takeFirstN = (n: number) => {
  let count = 0;
  return new TransformStream({
    transform(chunk, controller) {
      if (count < n) {
        controller.enqueue(chunk);
        count++;
      } else {
        controller.terminate(); // Stop processing more chunks
      }
    }
  });
};

Transformer Types

Transformer Interface

Configuration object for transform streams that defines how data is transformed.

interface Transformer<I = any, O = any> {
  /** Called immediately during construction of the TransformStream */
  start?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
  
  /** Called when a new chunk of data is ready to be transformed */
  transform?: (chunk: I, controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
  
  /** Called after all chunks written to the writable side have been transformed */
  flush?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
  
  /** Called when the readable side is cancelled or the writable side is aborted */
  cancel?: (reason: any) => void | PromiseLike<void>;
  
  /** Must be undefined for default transform streams */
  readableType?: undefined;
  
  /** Must be undefined for default transform streams */
  writableType?: undefined;
}

Usage Examples:

import { TransformStream } from "web-streams-polyfill";

// JSON processing transform stream
const jsonProcessor = new TransformStream({
  start(controller) {
    console.log("Starting JSON processing");
    this.buffer = '';
  },
  
  transform(chunk, controller) {
    this.buffer += chunk.toString();
    
    // Process complete JSON objects
    let startIndex = 0;
    let braceCount = 0;
    
    for (let i = 0; i < this.buffer.length; i++) {
      if (this.buffer[i] === '{') braceCount++;
      if (this.buffer[i] === '}') braceCount--;
      
      if (braceCount === 0 && i > startIndex) {
        const jsonStr = this.buffer.slice(startIndex, i + 1);
        try {
          const obj = JSON.parse(jsonStr);
          controller.enqueue(obj);
        } catch (error) {
          controller.error(new Error(`Invalid JSON: ${error.message}`));
          return;
        }
        startIndex = i + 1;
      }
    }
    
    // Keep remaining incomplete JSON in buffer
    this.buffer = this.buffer.slice(startIndex);
  },
  
  flush(controller) {
    if (this.buffer.trim()) {
      controller.error(new Error("Incomplete JSON at end of stream"));
    }
    console.log("JSON processing completed");
  }
});

// Compression transform stream
const compressionTransform = new TransformStream({
  start(controller) {
    this.chunks = [];
  },
  
  transform(chunk, controller) {
    // Collect chunks for batch compression
    this.chunks.push(chunk);
    
    // Flush when we have enough data
    if (this.chunks.length >= 10) {
      const combined = this.chunks.join('');
      this.chunks = [];
      
      // Simulate compression
      const compressed = `compressed(${combined})`;
      controller.enqueue(compressed);
    }
  },
  
  flush(controller) {
    // Flush remaining chunks
    if (this.chunks.length > 0) {
      const combined = this.chunks.join('');
      const compressed = `compressed(${combined})`;
      controller.enqueue(compressed);
    }
  }
});

// Rate limiting transform stream
const rateLimiter = (itemsPerSecond: number) => {
  let lastTime = Date.now();
  const interval = 1000 / itemsPerSecond;
  
  return new TransformStream({
    async transform(chunk, controller) {
      const now = Date.now();
      const timeDiff = now - lastTime;
      
      if (timeDiff < interval) {
        // Wait to maintain rate limit
        await new Promise(resolve => 
          setTimeout(resolve, interval - timeDiff)
        );
      }
      
      controller.enqueue(chunk);
      lastTime = Date.now();
    }
  });
};

Callback Types

type TransformerStartCallback<O> = (
  controller: TransformStreamDefaultController<O>
) => void | PromiseLike<void>;

type TransformerTransformCallback<I, O> = (
  chunk: I, 
  controller: TransformStreamDefaultController<O>
) => void | PromiseLike<void>;

type TransformerFlushCallback<O> = (
  controller: TransformStreamDefaultController<O>
) => void | PromiseLike<void>;

type TransformerCancelCallback = (reason: any) => void | PromiseLike<void>;

Common Transform Patterns

Identity Transform

// Pass through transform (no modification)
const identityTransform = new TransformStream();

// Equivalent to:
const identityTransform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk);
  }
});

Buffering Transform

// Buffer chunks and emit arrays
const bufferTransform = (bufferSize: number) => new TransformStream({
  start(controller) {
    this.buffer = [];
  },
  
  transform(chunk, controller) {
    this.buffer.push(chunk);
    
    if (this.buffer.length >= bufferSize) {
      controller.enqueue([...this.buffer]);
      this.buffer = [];
    }
  },
  
  flush(controller) {
    if (this.buffer.length > 0) {
      controller.enqueue([...this.buffer]);
    }
  }
});

Async Transform

// Transform with async operations
const asyncTransform = new TransformStream({
  async transform(chunk, controller) {
    try {
      // Simulate async processing (e.g., API call)
      const result = await processAsync(chunk);
      controller.enqueue(result);
    } catch (error) {
      controller.error(error);
    }
  }
});

async function processAsync(data: any): Promise<any> {
  // Simulate async work
  await new Promise(resolve => setTimeout(resolve, 100));
  return { processed: data, timestamp: Date.now() };
}

Install with Tessl CLI

npx tessl i tessl/npm-web-streams-polyfill

docs

index.md

queuing-strategies.md

readable-streams.md

transform-streams.md

writable-streams.md

tile.json