CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-scramjet--runner

Runtime environment for sequence execution and communication with Transform Hub host.

Pending
Overview
Eval results
Files

host-communication.mddocs/

Host Communication

The HostClient class manages network connections and data streams between the runner and the Scramjet Transform Hub host, providing multiple communication channels for different types of data.

Core Imports

import { HostClient } from "@scramjet/runner/src/host-client";

Capabilities

HostClient Class

Connects to Host and exposes streams per channel for comprehensive communication including data transfer, control messages, monitoring, and standard I/O.

/**
 * Connects to Host and exposes streams per channel (stdin, monitor etc.)
 */
class HostClient implements IHostClient {
  constructor(instancesServerPort: number, instancesServerHost: string);
  
  /** Logger instance for host communication */
  logger: IObjectLogger;
  
  /** BPMux multiplexer for HTTP connections */
  bpmux: any;
  
  /** HTTP agent for API connections */
  agent?: Agent;
}

Usage Example:

import { HostClient } from "@scramjet/runner/src/host-client";

// Create connection to host
const hostClient = new HostClient(3000, "localhost");

// Initialize connection with instance ID
await hostClient.init("instance-123");

// Use streams for communication
hostClient.inputStream.pipe(someProcessor);
someOutput.pipe(hostClient.outputStream);

// Clean disconnect
await hostClient.disconnect(false);

Connection Management

Initialize and manage connections to the Transform Hub host.

/**
 * Initialize connection to host with multiple streams
 * @param id - Instance identifier (36 bytes)
 * @returns Promise that resolves when all connections are established
 */
init(id: string): Promise<void>;

/**
 * Disconnect from host and clean up all streams
 * @param hard - Whether to force close streams without graceful shutdown
 * @returns Promise that resolves when disconnection is complete
 */
disconnect(hard: boolean): Promise<void>;

/**
 * Get HTTP agent for making API calls to host
 * @returns HTTP agent configured for host communication
 * @throws Error if agent is not initialized
 */
getAgent(): Agent;

Stream Access

Access to various communication streams between runner and host.

/**
 * Standard input stream from host to sequence
 */
readonly stdinStream: Readable;

/**
 * Standard output stream from sequence to host
 */
readonly stdoutStream: Writable;

/**
 * Standard error stream from sequence to host
 */
readonly stderrStream: Writable;

/**
 * Control message stream from host (JSON messages)
 */
readonly controlStream: Readable;

/**
 * Monitoring message stream to host (health, status updates)
 */
readonly monitorStream: Writable;

/**
 * Sequence input data stream from host
 */
readonly inputStream: Readable;

/**
 * Sequence output data stream to host
 */
readonly outputStream: Writable;

/**
 * Logging stream to host for structured logs
 */
readonly logStream: Writable;

/**
 * Package communication stream for API multiplexing
 */
readonly packageStream: Readable;

Usage Example:

// Read control messages
StringStream
  .from(hostClient.controlStream)
  .JSONParse()
  .each(([code, data]) => {
    console.log("Control message:", code, data);
  });

// Send monitoring data
const monitoringData = [RunnerMessageCode.MONITORING, { healthy: true }];
hostClient.monitorStream.write(JSON.stringify(monitoringData) + "\r\n");

// Process input data
hostClient.inputStream
  .pipe(new PassThrough())
  .on('data', (chunk) => {
    console.log("Received data:", chunk.toString());
  });

Supporting Types

interface IHostClient {
  logger: IObjectLogger;
  
  init(id: string): Promise<void>;
  disconnect(hard: boolean): Promise<void>;
  getAgent(): Agent;
  
  readonly stdinStream: Readable;
  readonly stdoutStream: Writable;
  readonly stderrStream: Writable;
  readonly controlStream: Readable;
  readonly monitorStream: Writable;
  readonly inputStream: Readable;
  readonly outputStream: Writable;
  readonly logStream: Writable;
  readonly packageStream: Readable;
}

interface UpstreamStreamsConfig extends Array<Socket> {
  [index: number]: Socket;
}

enum CommunicationChannel {
  IN = 0,
  OUT = 1,
  CONTROL = 2,
  MONITORING = 3,
  STDIN = 4,
  STDOUT = 5,
  STDERR = 6,
  LOG = 7,
  PACKAGE = 8
}

Communication Protocol

The HostClient uses a multi-stream protocol with dedicated channels:

  1. Data Streams (IN/OUT): Sequence input and output data
  2. Control Stream: JSON messages for sequence control (start, stop, kill)
  3. Monitoring Stream: Health status and lifecycle messages
  4. Standard I/O: stdin, stdout, stderr for sequence processes
  5. Logging Stream: Structured logging output
  6. Package Stream: HTTP API multiplexing via BPMux

All streams use TCP connections with binary protocol multiplexing for HTTP API calls. Control and monitoring messages use JSON format with CRLF line termination.

Install with Tessl CLI

npx tessl i tessl/npm-scramjet--runner

docs

application-context.md

host-communication.md

index.md

message-utilities.md

runner-execution.md

stream-processing.md

tile.json