CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-scramjet--runner

Runtime environment for sequence execution and communication with Transform Hub host.

Pending
Overview
Eval results
Files

message-utilities.mddocs/

Message Utilities

The MessageUtils class provides utility functions for encoding and transmitting monitoring messages through streams in the Scramjet Transform Hub communication protocol.

Capabilities

MessageUtils Class

Static utility class for handling message encoding and stream writing in the Transform Hub communication protocol.

/**
 * Utility class for stream messaging operations
 */
class MessageUtils {
  /**
   * Write encoded monitoring message to a stream with proper formatting
   * @param message - Encoded monitoring message as [code, data] tuple
   * @param streamToWrite - Writable stream to send the message to
   * @throws Error if streamToWrite is undefined
   */
  static writeMessageOnStream(
    message: EncodedMonitoringMessage,
    streamToWrite: WritableStream<any>
  ): void;
}

Usage Example:

import { MessageUtils } from "@scramjet/runner";
import { RunnerMessageCode } from "@scramjet/symbols";

// Send health monitoring message
const healthMessage: EncodedMonitoringMessage = [
  RunnerMessageCode.MONITORING,
  { healthy: true, timestamp: Date.now() }
];

MessageUtils.writeMessageOnStream(healthMessage, hostClient.monitorStream);

// Send keep-alive message
const keepAliveMessage: EncodedMonitoringMessage = [
  RunnerMessageCode.ALIVE,
  { keepAlive: 5000 }
];

MessageUtils.writeMessageOnStream(keepAliveMessage, hostClient.monitorStream);

// Send sequence completion message
const completionMessage: EncodedMonitoringMessage = [
  RunnerMessageCode.SEQUENCE_COMPLETED,
  { timeout: 10000 }
];

MessageUtils.writeMessageOnStream(completionMessage, hostClient.monitorStream);

Message Format

The Transform Hub communication protocol uses JSON-encoded messages with CRLF line termination:

  • Format: JSON.stringify([code, data]) + "\r\n"
  • Structure: Messages are tuples of [MessageCode, MessageData]
  • Encoding: UTF-8 text encoding
  • Termination: Messages end with carriage return + line feed (\r\n)

Common Message Types

enum RunnerMessageCode {
  PING = 0,
  PONG = 1,
  KILL = 6,
  STOP = 7,
  MONITORING = 8,
  MONITORING_RATE = 9,
  MONITORING_REPLY = 10,
  ALIVE = 11,
  EVENT = 12,
  SEQUENCE_STOPPED = 13,
  SEQUENCE_COMPLETED = 14,
  PANG = 15,
  ACKNOWLEDGE = 16
}

// Health monitoring message
type HealthMessage = [RunnerMessageCode.MONITORING, { healthy: boolean }];

// Keep-alive message
type KeepAliveMessage = [RunnerMessageCode.ALIVE, { keepAlive: number }];

// Event message
type EventMessage = [RunnerMessageCode.EVENT, { eventName: string; message?: any }];

// Sequence completion message
type CompletionMessage = [RunnerMessageCode.SEQUENCE_COMPLETED, { timeout: number }];

// PANG message (input/output requirements)
type PangMessage = [RunnerMessageCode.PANG, {
  requires?: string;
  provides?: string;
  contentType?: string;
  outputEncoding?: string;
}];

Advanced Usage Example:

import { MessageUtils } from "@scramjet/runner";
import { RunnerMessageCode } from "@scramjet/symbols";

class SequenceMonitor {
  constructor(private monitorStream: WritableStream<any>) {}
  
  reportHealth(healthy: boolean, details?: any) {
    const message: EncodedMonitoringMessage = [
      RunnerMessageCode.MONITORING,
      { healthy, ...details }
    ];
    MessageUtils.writeMessageOnStream(message, this.monitorStream);
  }
  
  sendKeepAlive(duration: number) {
    const message: EncodedMonitoringMessage = [
      RunnerMessageCode.ALIVE,
      { keepAlive: duration }
    ];
    MessageUtils.writeMessageOnStream(message, this.monitorStream);
  }
  
  emitEvent(eventName: string, data?: any) {
    const message: EncodedMonitoringMessage = [
      RunnerMessageCode.EVENT,
      { eventName, message: data }
    ];
    MessageUtils.writeMessageOnStream(message, this.monitorStream);
  }
  
  reportCompletion(timeout: number = 10000) {
    const message: EncodedMonitoringMessage = [
      RunnerMessageCode.SEQUENCE_COMPLETED,
      { timeout }
    ];
    MessageUtils.writeMessageOnStream(message, this.monitorStream);
  }
  
  sendPang(requirements: {
    requires?: string;
    provides?: string;
    contentType?: string;
    outputEncoding?: string;
  }) {
    const message: EncodedMonitoringMessage = [
      RunnerMessageCode.PANG,
      requirements
    ];
    MessageUtils.writeMessageOnStream(message, this.monitorStream);
  }
}

// Usage
const monitor = new SequenceMonitor(hostClient.monitorStream);
monitor.reportHealth(true, { memoryUsage: process.memoryUsage() });
monitor.sendKeepAlive(30000);
monitor.emitEvent("data-processed", { count: 100 });

Error Handling

The writeMessageOnStream method validates the stream parameter and throws an error if it's undefined:

// This will throw: "The Stream is not defined."
try {
  MessageUtils.writeMessageOnStream(
    [RunnerMessageCode.MONITORING, { healthy: true }],
    undefined as any
  );
} catch (error) {
  console.error(error.message); // "The Stream is not defined."
}

Supporting Types

type EncodedMonitoringMessage = [RunnerMessageCode, any];

interface WritableStream<T> {
  write(chunk: T): boolean;
}

interface RunnerMessageCode {
  PING: 0;
  PONG: 1;
  KILL: 6;
  STOP: 7;
  MONITORING: 8;
  MONITORING_RATE: 9;
  MONITORING_REPLY: 10;
  ALIVE: 11;
  EVENT: 12;
  SEQUENCE_STOPPED: 13;
  SEQUENCE_COMPLETED: 14;
  PANG: 15;
  ACKNOWLEDGE: 16;
}

Install with Tessl CLI

npx tessl i tessl/npm-scramjet--runner

docs

application-context.md

host-communication.md

index.md

message-utilities.md

runner-execution.md

stream-processing.md

tile.json