Runtime environment for sequence execution and communication with Transform Hub host.
—
Stream processing utilities for handling input streams with different content types and formats, including header parsing and data stream mapping.
import {
readInputStreamHeaders,
mapToInputDataStream,
inputStreamInitLogger
} from "@scramjet/runner/src/input-stream";Reads HTTP-style headers from input streams to determine content type and other metadata.
/**
* Read HTTP-style headers from a readable stream
* @param stream - Readable stream containing headers followed by data
* @returns Promise resolving to object with header key/values (header names are lowercase)
*/
function readInputStreamHeaders(stream: Readable): Promise<Record<string, string>>;Usage Example:
import { readInputStreamHeaders } from "@scramjet/runner/src/input-stream";
// Read headers from input stream
const headers = await readInputStreamHeaders(inputStream);
console.log(headers);
// Output: { "content-type": "application/x-ndjson", "content-length": "1024" }
// Use content type for stream processing
const contentType = headers["content-type"];
const dataStream = mapToInputDataStream(inputStream, contentType);The function expects headers in HTTP format with CRLF line endings and a blank line separating headers from body:
Content-Type: application/x-ndjson\r\n
Content-Length: 1024\r\n
\r\n
[actual data starts here]Maps input streams to appropriate DataStream types based on content type.
/**
* Map input stream to DataStream based on content type
* @param stream - Readable stream containing data
* @param contentType - Content type string determining stream processing
* @returns DataStream configured for the specified content type
* @throws Error if contentType is undefined or unsupported
*/
function mapToInputDataStream(stream: Readable, contentType: string): DataStream;Supported Content Types:
application/x-ndjson: Newline-delimited JSON (parsed as JSON objects)text/x-ndjson: Newline-delimited JSON (parsed as JSON objects)text/plain: Plain text (UTF-8 encoded StringStream)application/octet-stream: Binary data (BufferStream)Usage Example:
import { mapToInputDataStream } from "@scramjet/runner/src/input-stream";
// Process JSON data
const jsonStream = mapToInputDataStream(inputStream, "application/x-ndjson");
jsonStream.each(obj => {
console.log("Received object:", obj);
});
// Process plain text
const textStream = mapToInputDataStream(inputStream, "text/plain");
textStream.each(line => {
console.log("Text line:", line);
});
// Process binary data
const binaryStream = mapToInputDataStream(inputStream, "application/octet-stream");
binaryStream.each(buffer => {
console.log("Binary chunk:", buffer);
});
// Handle unsupported content type
try {
const unsupportedStream = mapToInputDataStream(inputStream, "image/png");
} catch (error) {
console.error(error.message);
// "Content-Type does not match any supported value. The actual value is image/png"
}Logger instance for input stream initialization and processing.
/**
* Logger instance for input stream initialization
*/
const inputStreamInitLogger: IObjectLogger;Usage Example:
import { inputStreamInitLogger } from "@scramjet/runner/src/input-stream";
// The logger is automatically used by stream processing functions
// You can also access it directly for custom logging
inputStreamInitLogger.debug("Processing input stream", { contentType: "application/json" });import {
readInputStreamHeaders,
mapToInputDataStream,
inputStreamInitLogger
} from "@scramjet/runner/src/input-stream";
async function processInputStream(inputStream: Readable) {
try {
// Read headers to determine content type
const headers = await readInputStreamHeaders(inputStream);
inputStreamInitLogger.debug("Headers received", headers);
const contentType = headers["content-type"];
if (!contentType) {
throw new Error("Content-Type header is required");
}
// Map to appropriate data stream
const dataStream = mapToInputDataStream(inputStream, contentType);
// Process based on content type
switch (contentType) {
case "application/x-ndjson":
case "text/x-ndjson":
return dataStream.map(obj => ({
...obj,
processed: true,
timestamp: Date.now()
}));
case "text/plain":
return dataStream
.split("\n")
.filter(line => line.trim().length > 0)
.map(line => ({ text: line, length: line.length }));
case "application/octet-stream":
return dataStream.map(buffer => ({
size: buffer.length,
checksum: buffer.reduce((sum, byte) => sum + byte, 0)
}));
default:
throw new Error(`Unsupported content type: ${contentType}`);
}
} catch (error) {
inputStreamInitLogger.error("Stream processing failed", error);
throw error;
}
}function processCustomContentType(stream: Readable, contentType: string) {
// For unsupported content types, fall back to binary processing
if (!["application/x-ndjson", "text/x-ndjson", "text/plain", "application/octet-stream"].includes(contentType)) {
inputStreamInitLogger.warn(`Unsupported content type ${contentType}, treating as binary`);
return mapToInputDataStream(stream, "application/octet-stream");
}
return mapToInputDataStream(stream, contentType);
}async function safeStreamProcessing(inputStream: Readable) {
try {
const headers = await readInputStreamHeaders(inputStream);
const contentType = headers["content-type"];
if (!contentType) {
inputStreamInitLogger.error("Missing Content-Type header");
throw new Error("Content-Type header is required");
}
const dataStream = mapToInputDataStream(inputStream, contentType);
return dataStream;
} catch (error) {
if (error.message.includes("Content-Type does not match")) {
inputStreamInitLogger.error("Unsupported content type", { contentType });
// Fallback to binary processing
return mapToInputDataStream(inputStream, "application/octet-stream");
}
inputStreamInitLogger.error("Stream processing error", error);
throw error;
}
}interface Readable {
read(): Buffer | null;
on(event: string, callback: Function): this;
off(event: string, callback: Function): this;
unshift(chunk: Buffer): void;
}
interface DataStream {
map<U>(fn: (item: any) => U): DataStream;
filter(fn: (item: any) => boolean): DataStream;
each(fn: (item: any) => void): DataStream;
pipe(destination: any): any;
}
interface StringStream extends DataStream {
split(separator: string): StringStream;
JSONParse(multi?: boolean): DataStream;
}
interface BufferStream extends DataStream {
// Buffer-specific stream methods
}
interface IObjectLogger {
debug(message: string, data?: any): void;
info(message: string, data?: any): void;
warn(message: string, data?: any): void;
error(message: string, data?: any): void;
}Install with Tessl CLI
npx tessl i tessl/npm-scramjet--runner