CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-scramjet--runner

Runtime environment for sequence execution and communication with Transform Hub host.

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Stream processing utilities for handling input streams with different content types and formats, including header parsing and data stream mapping.

Core Imports

import { 
  readInputStreamHeaders,
  mapToInputDataStream,
  inputStreamInitLogger
} from "@scramjet/runner/src/input-stream";

Capabilities

Input Stream Header Reading

Reads HTTP-style headers from input streams to determine content type and other metadata.

/**
 * Read HTTP-style headers from a readable stream
 * @param stream - Readable stream containing headers followed by data
 * @returns Promise resolving to object with header key/values (header names are lowercase)
 */
function readInputStreamHeaders(stream: Readable): Promise<Record<string, string>>;

Usage Example:

import { readInputStreamHeaders } from "@scramjet/runner/src/input-stream";

// Read headers from input stream
const headers = await readInputStreamHeaders(inputStream);
console.log(headers);
// Output: { "content-type": "application/x-ndjson", "content-length": "1024" }

// Use content type for stream processing
const contentType = headers["content-type"];
const dataStream = mapToInputDataStream(inputStream, contentType);

The function expects headers in HTTP format with CRLF line endings and a blank line separating headers from body:

Content-Type: application/x-ndjson\r\n
Content-Length: 1024\r\n
\r\n
[actual data starts here]

Input Data Stream Mapping

Maps input streams to appropriate DataStream types based on content type.

/**
 * Map input stream to DataStream based on content type
 * @param stream - Readable stream containing data
 * @param contentType - Content type string determining stream processing
 * @returns DataStream configured for the specified content type
 * @throws Error if contentType is undefined or unsupported
 */
function mapToInputDataStream(stream: Readable, contentType: string): DataStream;

Supported Content Types:

  • application/x-ndjson: Newline-delimited JSON (parsed as JSON objects)
  • text/x-ndjson: Newline-delimited JSON (parsed as JSON objects)
  • text/plain: Plain text (UTF-8 encoded StringStream)
  • application/octet-stream: Binary data (BufferStream)

Usage Example:

import { mapToInputDataStream } from "@scramjet/runner/src/input-stream";

// Process JSON data
const jsonStream = mapToInputDataStream(inputStream, "application/x-ndjson");
jsonStream.each(obj => {
  console.log("Received object:", obj);
});

// Process plain text
const textStream = mapToInputDataStream(inputStream, "text/plain");
textStream.each(line => {
  console.log("Text line:", line);
});

// Process binary data
const binaryStream = mapToInputDataStream(inputStream, "application/octet-stream");
binaryStream.each(buffer => {
  console.log("Binary chunk:", buffer);
});

// Handle unsupported content type
try {
  const unsupportedStream = mapToInputDataStream(inputStream, "image/png");
} catch (error) {
  console.error(error.message);
  // "Content-Type does not match any supported value. The actual value is image/png"
}

Input Stream Logger

Logger instance for input stream initialization and processing.

/**
 * Logger instance for input stream initialization
 */
const inputStreamInitLogger: IObjectLogger;

Usage Example:

import { inputStreamInitLogger } from "@scramjet/runner/src/input-stream";

// The logger is automatically used by stream processing functions
// You can also access it directly for custom logging
inputStreamInitLogger.debug("Processing input stream", { contentType: "application/json" });

Advanced Usage

Complete Stream Processing Pipeline

import { 
  readInputStreamHeaders, 
  mapToInputDataStream,
  inputStreamInitLogger 
} from "@scramjet/runner/src/input-stream";

async function processInputStream(inputStream: Readable) {
  try {
    // Read headers to determine content type
    const headers = await readInputStreamHeaders(inputStream);
    inputStreamInitLogger.debug("Headers received", headers);
    
    const contentType = headers["content-type"];
    if (!contentType) {
      throw new Error("Content-Type header is required");
    }
    
    // Map to appropriate data stream
    const dataStream = mapToInputDataStream(inputStream, contentType);
    
    // Process based on content type
    switch (contentType) {
      case "application/x-ndjson":
      case "text/x-ndjson":
        return dataStream.map(obj => ({
          ...obj,
          processed: true,
          timestamp: Date.now()
        }));
        
      case "text/plain":
        return dataStream
          .split("\n")
          .filter(line => line.trim().length > 0)
          .map(line => ({ text: line, length: line.length }));
          
      case "application/octet-stream":
        return dataStream.map(buffer => ({
          size: buffer.length,
          checksum: buffer.reduce((sum, byte) => sum + byte, 0)
        }));
        
      default:
        throw new Error(`Unsupported content type: ${contentType}`);
    }
  } catch (error) {
    inputStreamInitLogger.error("Stream processing failed", error);
    throw error;
  }
}

Custom Content Type Handling

function processCustomContentType(stream: Readable, contentType: string) {
  // For unsupported content types, fall back to binary processing
  if (!["application/x-ndjson", "text/x-ndjson", "text/plain", "application/octet-stream"].includes(contentType)) {
    inputStreamInitLogger.warn(`Unsupported content type ${contentType}, treating as binary`);
    return mapToInputDataStream(stream, "application/octet-stream");
  }
  
  return mapToInputDataStream(stream, contentType);
}

Error Handling

async function safeStreamProcessing(inputStream: Readable) {
  try {
    const headers = await readInputStreamHeaders(inputStream);
    const contentType = headers["content-type"];
    
    if (!contentType) {
      inputStreamInitLogger.error("Missing Content-Type header");
      throw new Error("Content-Type header is required");
    }
    
    const dataStream = mapToInputDataStream(inputStream, contentType);
    return dataStream;
    
  } catch (error) {
    if (error.message.includes("Content-Type does not match")) {
      inputStreamInitLogger.error("Unsupported content type", { contentType });
      // Fallback to binary processing
      return mapToInputDataStream(inputStream, "application/octet-stream");
    }
    
    inputStreamInitLogger.error("Stream processing error", error);
    throw error;
  }
}

Supporting Types

interface Readable {
  read(): Buffer | null;
  on(event: string, callback: Function): this;
  off(event: string, callback: Function): this;
  unshift(chunk: Buffer): void;
}

interface DataStream {
  map<U>(fn: (item: any) => U): DataStream;
  filter(fn: (item: any) => boolean): DataStream;
  each(fn: (item: any) => void): DataStream;
  pipe(destination: any): any;
}

interface StringStream extends DataStream {
  split(separator: string): StringStream;
  JSONParse(multi?: boolean): DataStream;
}

interface BufferStream extends DataStream {
  // Buffer-specific stream methods
}

interface IObjectLogger {
  debug(message: string, data?: any): void;
  info(message: string, data?: any): void;
  warn(message: string, data?: any): void;
  error(message: string, data?: any): void;
}

Install with Tessl CLI

npx tessl i tessl/npm-scramjet--runner

docs

application-context.md

host-communication.md

index.md

message-utilities.md

runner-execution.md

stream-processing.md

tile.json